diff --git a/src/main/java/com/actiontech/dble/net/Session.java b/src/main/java/com/actiontech/dble/net/Session.java index 5c35f260e..62b83f46d 100644 --- a/src/main/java/com/actiontech/dble/net/Session.java +++ b/src/main/java/com/actiontech/dble/net/Session.java @@ -11,9 +11,9 @@ import com.actiontech.dble.net.connection.FrontendConnection; import com.actiontech.dble.net.mysql.MySQLPacket; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.route.parser.util.ParseUtil; +import com.actiontech.dble.statistic.sql.entry.FrontendInfo; import com.actiontech.dble.statistic.trace.AbstractTrackProbe; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -22,6 +22,7 @@ public abstract class Session { protected final AtomicBoolean isMultiStatement = new AtomicBoolean(false); protected volatile String remainingSql = null; protected AbstractTrackProbe trackProbe; + private volatile FrontendInfo traceFrontendInfo; /** * get frontend conn @@ -29,7 +30,16 @@ public abstract class Session { public abstract FrontendConnection getSource(); public void trace(Consumer consumer) { - Optional.ofNullable(trackProbe).ifPresent(consumer); + if (trackProbe != null) { + consumer.accept(trackProbe); + } + } + + public FrontendInfo getTraceFrontendInfo() { + if (traceFrontendInfo == null) { + traceFrontendInfo = new FrontendInfo(this.getSource().getFrontEndService()); + } + return traceFrontendInfo; } public void setHandlerStart(DMLResponseHandler handler) { @@ -105,4 +115,8 @@ public abstract class Session { public void setRemainingSql(String remainingSql) { this.remainingSql = remainingSql; } + + public void setTrackProbe(AbstractTrackProbe trackProbe) { + this.trackProbe = trackProbe; + } } diff --git a/src/main/java/com/actiontech/dble/net/connection/BackendConnection.java b/src/main/java/com/actiontech/dble/net/connection/BackendConnection.java index fb4488070..93607ae0b 100644 --- a/src/main/java/com/actiontech/dble/net/connection/BackendConnection.java +++ b/src/main/java/com/actiontech/dble/net/connection/BackendConnection.java @@ -19,6 +19,7 @@ import com.actiontech.dble.net.service.AuthService; import com.actiontech.dble.services.mysqlauthenticate.MySQLBackAuthService; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.singleton.FlowController; +import com.actiontech.dble.statistic.sql.entry.BackendInfo; import com.actiontech.dble.util.TimeUtil; import java.nio.ByteBuffer; @@ -37,6 +38,7 @@ public class BackendConnection extends PooledConnection { private volatile boolean backendWriteFlowControlled; private volatile String bindFront; + private volatile BackendInfo traceBackendInfo; public BackendConnection(NetworkChannel channel, SocketWR socketWR, ReadTimeStatusInstance instance, ResponseHandler handler, String schema) { super(channel, socketWR); @@ -242,6 +244,12 @@ public class BackendConnection extends PooledConnection { return instance.isReadInstance(); } + public BackendInfo getTraceBackendInfo() { + if (traceBackendInfo == null) + traceBackendInfo = new BackendInfo(this); + return traceBackendInfo; + } + @Override public String toString() { // show all return "BackendConnection[id = " + id + " host = " + host + " port = " + port + " localPort = " + localPort + " mysqlId = " + threadId + " db config = " + instance + (bindFront != null ? ", currentBindFrontend = " + bindFront : "") + "]"; diff --git a/src/main/java/com/actiontech/dble/net/response/DefaultResponseHandler.java b/src/main/java/com/actiontech/dble/net/response/DefaultResponseHandler.java index 2cb2ff833..446bd1466 100644 --- a/src/main/java/com/actiontech/dble/net/response/DefaultResponseHandler.java +++ b/src/main/java/com/actiontech/dble/net/response/DefaultResponseHandler.java @@ -67,7 +67,8 @@ public class DefaultResponseHandler implements ProtocolResponseHandler { beforeError(); if (respHand != null) { - Optional.ofNullable(service.getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseEndTime(service))); + if (service.getOriginSession() != null) + service.getOriginSession().trace(t -> t.setBackendResponseEndTime(service)); IODelayProvider.beforeErrorResponse(service); respHand.errorResponse(data, service); } else { @@ -127,7 +128,8 @@ public class DefaultResponseHandler implements ProtocolResponseHandler { protected void closeNoHandler() { if (!service.getConnection().isClosed()) { LOGGER.info("no handler bind in this service " + service); - Optional.ofNullable(service.getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseEndTime(service))); + if (service.getOriginSession() != null) + service.getOriginSession().trace(t -> t.setBackendResponseEndTime(service)); service.getConnection().close("no handler"); } } @@ -146,7 +148,8 @@ public class DefaultResponseHandler implements ProtocolResponseHandler { //LOGGER.info("get into rowing data " + data.length); ResponseHandler respHand = service.getResponseHandler(); if (respHand != null) { - Optional.ofNullable(service.getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendSqlAddRows(service))); + if (service.getOriginSession() != null) + service.getOriginSession().trace(t -> t.setBackendSqlAddRows(service)); respHand.rowResponse(data, null, false, service); } else { closeNoHandler(); diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DefaultDruidParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DefaultDruidParser.java index e061e2d01..c1c7487aa 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DefaultDruidParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DefaultDruidParser.java @@ -23,6 +23,7 @@ import com.actiontech.dble.route.util.RouterUtil; import com.actiontech.dble.server.util.SchemaUtil; import com.actiontech.dble.services.mysqlsharding.ShardingService; import com.actiontech.dble.singleton.ProxyMeta; +import com.actiontech.dble.util.CollectionUtil; import com.actiontech.dble.util.StringUtil; import com.alibaba.druid.sql.ast.SQLStatement; import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlOutputVisitor; @@ -53,6 +54,9 @@ public class DefaultDruidParser implements DruidParser { ctx = new DruidShardingParseInfo(); schema = visitorParse(schema, rrs, stmt, schemaStatVisitor, service, isExplain); changeSql(schema, rrs, stmt); + if (!CollectionUtil.isEmpty(ctx.getTables())) { + service.getSession2().trace(t -> t.addTable(ctx.getTables())); + } return schema; } diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidDeleteParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidDeleteParser.java index 983b577f5..faf2a9295 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidDeleteParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidDeleteParser.java @@ -12,6 +12,7 @@ import com.actiontech.dble.config.privileges.ShardingPrivileges; import com.actiontech.dble.config.privileges.ShardingPrivileges.CheckType; import com.actiontech.dble.route.RouteResultset; import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor; +import com.actiontech.dble.route.parser.util.Pair; import com.actiontech.dble.route.util.RouterUtil; import com.actiontech.dble.server.util.SchemaUtil; import com.actiontech.dble.server.util.SchemaUtil.SchemaInfo; @@ -28,6 +29,7 @@ import com.google.common.collect.Sets; import java.sql.SQLException; import java.sql.SQLNonTransientException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -95,6 +97,8 @@ public class DruidDeleteParser extends DruidModifyParser { String msg = "The statement DML privilege check is not passed, sql:" + stmt.toString().replaceAll("[\\t\\n\\r]", " "); throw new SQLNonTransientException(msg); } + + service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfo.getSchema(), schemaInfo.getTable())))); SchemaConfig originSchema = schema; schema = schemaInfo.getSchemaConfig(); BaseTableConfig tc = schema.getTables().get(schemaInfo.getTable()); diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertParser.java index fbbe45f07..387c92a87 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertParser.java @@ -59,7 +59,8 @@ public class DruidInsertParser extends DruidInsertReplaceParser { } schema = schemaInfo.getSchemaConfig(); - visitor.setCurrentTable(schemaInfo.getTable()); + String tableName = schemaInfo.getTable(); + service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfo.getSchema(), tableName)))); if (insert.getQuery() != null) { tryRouteInsertQuery(service, rrs, stmt, visitor, schemaInfo); return schema; @@ -70,7 +71,6 @@ public class DruidInsertParser extends DruidInsertReplaceParser { throw new SQLNonTransientException(msg); } - String tableName = schemaInfo.getTable(); if (parserNoSharding(service, schemaName, schemaInfo, rrs, insert)) { return schema; } diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertReplaceParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertReplaceParser.java index 7e88d82b1..cd140069b 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertReplaceParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertReplaceParser.java @@ -71,6 +71,7 @@ abstract class DruidInsertReplaceParser extends DruidModifyParser { for (String selectTable : visitor.getSelectTableList()) { SchemaUtil.SchemaInfo schemaInfox = SchemaUtil.getSchemaInfo(service.getUser(), schema, selectTable); tableSet.add(schemaInfox.getSchema() + "." + schemaInfox.getTable()); + service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfox.getSchema(), schemaInfox.getTable())))); if (!ShardingPrivileges.checkPrivilege(service.getUserConfig(), schemaInfox.getSchema(), schemaInfox.getTable(), ShardingPrivileges.CheckType.SELECT)) { String msg = "The statement DML privilege check is not passed, sql:" + stmt.toString().replaceAll("[\\t\\n\\r]", " "); throw new SQLNonTransientException(msg); diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidReplaceParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidReplaceParser.java index 712028d82..8423e5026 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidReplaceParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidReplaceParser.java @@ -63,6 +63,7 @@ public class DruidReplaceParser extends DruidInsertReplaceParser { //No sharding table check schema = schemaInfo.getSchemaConfig(); String tableName = schemaInfo.getTable(); + service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfo.getSchema(), tableName)))); if (parserNoSharding(service, schemaName, schemaInfo, rrs, replace)) { return schema; } diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidSelectParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidSelectParser.java index 52762bc47..7896b50fb 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidSelectParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidSelectParser.java @@ -124,6 +124,9 @@ public class DruidSelectParser extends DefaultDruidParser { mysqlFrom instanceof SQLJoinTableSource || mysqlFrom instanceof SQLUnionQueryTableSource) { super.visitorParse(schema, rrs, stmt, visitor, service, isExplain); + if (!CollectionUtil.isEmpty(ctx.getTables())) { + service.getSession2().trace(t -> t.addTable(ctx.getTables())); + } return executeComplexSQL(schemaName, schema, rrs, selectStmt, service, visitor.getSelectTableList().size(), visitor.isContainsInnerFunction()); } } else if (sqlSelectQuery instanceof SQLUnionQuery) { diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidUpdateParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidUpdateParser.java index 8f21bd6fe..4c5f9ff78 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidUpdateParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidUpdateParser.java @@ -98,6 +98,7 @@ public class DruidUpdateParser extends DruidModifyParser { super.visitorParse(originSchema, rrs, stmt, visitor, service, isExplain); String tableName = schemaInfo.getTable(); + service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfo.getSchema(), tableName)))); BaseTableConfig tc = schema.getTables().get(tableName); String noShardingNode = RouterUtil.isNoSharding(schema, tableName); diff --git a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java index 93f29b397..9d7e5f668 100644 --- a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java @@ -201,7 +201,7 @@ public class RWSplitNonBlockingSession extends Session { if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) { long statementId = ByteUtil.readUB4(originPacket, 5); PreparedStatementHolder holder = rwSplitService.getPrepareStatement(statementId); - trace(t -> t.setQuery(holder.getPrepareSql())); + trace(t -> t.setQuery(holder.getPrepareSql(), holder.getSqlType())); if (holder.isMustMaster() && conn.getInstance().isReadInstance()) { holder.setExecuteOrigin(originPacket); PSHandler psHandler = new PSHandler(rwSplitService, holder); @@ -225,6 +225,7 @@ public class RWSplitNonBlockingSession extends Session { /** * jdbc compatible pre-delivery statements + * * @param master * @param originPacket * @param callback diff --git a/src/main/java/com/actiontech/dble/server/ServerQueryHandler.java b/src/main/java/com/actiontech/dble/server/ServerQueryHandler.java index 0e83813e3..87409bbed 100644 --- a/src/main/java/com/actiontech/dble/server/ServerQueryHandler.java +++ b/src/main/java/com/actiontech/dble/server/ServerQueryHandler.java @@ -52,7 +52,7 @@ public class ServerQueryHandler implements FrontendQueryHandler { sql = sql.substring(0, ParseUtil.findNextBreak(sql)); } String finalSql = sql; - this.service.getSession2().trace(t -> t.setQuery(finalSql)); + this.service.setExecuteSql(sql); if (LOGGER.isDebugEnabled()) { LOGGER.debug("{} query sql: {}", service.toString3(), (sql.length() > 1024 ? sql.substring(0, 1024) + "..." : sql)); @@ -61,6 +61,7 @@ public class ServerQueryHandler implements FrontendQueryHandler { int rs = serverParse.parse(sql); boolean isWithHint = serverParse.startWithHint(sql); int sqlType = rs & 0xff; + this.service.getSession2().trace(t -> t.setQuery(finalSql, sqlType)); if (isWithHint) { service.controlTx(TransactionOperate.QUERY); if (sqlType == ServerParse.INSERT || sqlType == ServerParse.DELETE || sqlType == ServerParse.UPDATE || diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/statistic/FrontendByBackendByEntryByUser.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/statistic/FrontendByBackendByEntryByUser.java index f2fb98255..18fedfda5 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/statistic/FrontendByBackendByEntryByUser.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/statistic/FrontendByBackendByEntryByUser.java @@ -139,7 +139,7 @@ public class FrontendByBackendByEntryByUser extends ManagerBaseTable { map.put(COLUMN_FRONTEND_HOST, v.getValue().getFrontend().getHost()); map.put(COLUMN_BACKEND_HOST, v.getValue().getBackend().getHost()); map.put(COLUMN_BACKEND_PORT, String.valueOf(v.getValue().getBackend().getPort())); - map.put(COLUMN_SHARDING_NODE, v.getValue().getBackend().getNode()); + map.put(COLUMN_SHARDING_NODE, v.getValue().getNode()); map.put(COLUMN_DB_INSTANCE, v.getValue().getBackend().getName()); map.put(COLUMN_TX_COUNT, String.valueOf(v.getValue().getTxCount())); diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java index 0c20db500..2ce627992 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java @@ -44,7 +44,10 @@ import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; @@ -69,6 +72,8 @@ public class MySQLResponseService extends BackendService { private static final CommandPacket COMMIT = new CommandPacket(); private static final CommandPacket ROLLBACK = new CommandPacket(); + private volatile String traceRouteKey = null; + static { COMMIT.setPacketId(0); COMMIT.setCommand(MySQLPacket.COM_QUERY); @@ -164,7 +169,8 @@ public class MySQLResponseService extends BackendService { } StringBuilder synSQL = getSynSql(service.isAutocommit(), service); - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendRequestTime(this)); if (synSQL != null) { sendQueryCmd(synSQL.toString(), service.getCharset()); } @@ -245,7 +251,8 @@ public class MySQLResponseService extends BackendService { if (synSQL == null) { // not need syn connection - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendRequestTime(this)); DbleServer.getInstance().getWriteToBackendQueue().add(Collections.singletonList(sendQueryCmdTask(rrn.getStatement(), clientCharset))); return; } @@ -256,7 +263,8 @@ public class MySQLResponseService extends BackendService { // and our query sql to multi command at last synSQL.append(rrn.getStatement()).append(";"); // syn and execute others - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendRequestTime(this)); // syn sharding List taskList = new ArrayList<>(1); taskList.add(sendQueryCmdTask(synSQL.toString(), clientCharset)); @@ -272,7 +280,8 @@ public class MySQLResponseService extends BackendService { try { if (synSQL == null) { // not need syn connection - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendRequestTime(this)); sendQueryCmd(sql, clientCharset); return; } @@ -283,7 +292,8 @@ public class MySQLResponseService extends BackendService { // and our query sql to multi command at last synSQL.append(sql).append(";"); // syn and execute others - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendRequestTime(this)); this.sendQueryCmd(synSQL.toString(), clientCharset); // waiting syn result... } finally { @@ -362,7 +372,8 @@ public class MySQLResponseService extends BackendService { protected boolean innerRelease() { if (isRowDataFlowing) { if (logResponse.compareAndSet(false, true)) { - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendRequestTime(this)); } if (SystemConfig.getInstance().getEnableAsyncRelease() == 1) { DbleServer.getInstance().getComplexQueryExecutor().execute(new BackEndRecycleRunnable(this)); @@ -434,7 +445,8 @@ public class MySQLResponseService extends BackendService { } if (synSQL == null) { // not need syn connection - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendRequestTime(this)); DbleServer.getInstance().getWriteToBackendQueue().add(Collections.singletonList(sendQueryCmdTask(rrn.getStatement(), clientCharset))); waitSyncResult(rrn, clientCharset); return; @@ -444,7 +456,8 @@ public class MySQLResponseService extends BackendService { // and our query sql to multi command at last // syn and execute others synSQL.append(rrn.getStatement()).append(";"); - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendRequestTime(this)); taskList.add(sendQueryCmdTask(synSQL.toString(), clientCharset)); DbleServer.getInstance().getWriteToBackendQueue().add(taskList); // waiting syn result... @@ -479,13 +492,15 @@ public class MySQLResponseService extends BackendService { } public void rollback() { - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseTxEnd(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendResponseTxEnd(this)); executeSql = "rollback"; ROLLBACK.write(this); } public void commit() { - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseTxEnd(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendResponseTxEnd(this)); executeSql = "commit"; COMMIT.write(this); } @@ -511,7 +526,8 @@ public class MySQLResponseService extends BackendService { if (task.getType() == ServiceTaskType.CLOSE) { return; } - Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseTime(this))); + if (getOriginSession() != null) + getOriginSession().trace(t -> t.setBackendResponseTime(this)); } @Override @@ -629,6 +645,20 @@ public class MySQLResponseService extends BackendService { return logResponse; } + public String getTraceRouteKey() { + if (traceRouteKey == null) { + RouteResultsetNode node; + if (this.getAttachment() instanceof RouteResultsetNode) { + node = (RouteResultsetNode) this.getAttachment(); + traceRouteKey = new StringBuilder(). + append(this.getConnection().getId()). + append(":").append(node.getName()). + append(":").append(node.getStatementHash()).toString(); + } + } + return traceRouteKey; + } + @Override public String toString() { return "MySQLResponseService[isExecuting = " + isExecuting + " attachment = " + attachment + " autocommitSynced = " + autocommitSynced + " isolationSynced = " + isolationSynced + diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitMultiQueryHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitMultiQueryHandler.java index 1baa9d8fb..f775f0319 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitMultiQueryHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitMultiQueryHandler.java @@ -39,10 +39,10 @@ public final class RWSplitMultiQueryHandler extends RWSplitQueryHandler { } String finalSql = sql.trim(); session.getService().setExecuteSql(finalSql); - session.trace(t -> t.setQuery(finalSql)); DbleHintParser.HintInfo hintInfo = DbleHintParser.parseRW(finalSql); int rs = serverParse.parse(finalSql); int sqlType = rs & 0xff; + session.trace(t -> t.setQuery(finalSql, sqlType)); Callback callback = null; if (hintInfo != null) { session.getService().controlTx(TransactionOperate.QUERY); diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java index 3cd74872f..c78a46f27 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java @@ -40,10 +40,10 @@ public class RWSplitQueryHandler implements FrontendQueryHandler { try { session.getService().queryCount(); session.getService().setExecuteSql(sql); - session.trace(t -> t.setQuery(sql)); DbleHintParser.HintInfo hintInfo = DbleHintParser.parseRW(sql); int rs = serverParse.parse(sql); int sqlType = rs & 0xff; + session.trace(t -> t.setQuery(sql, sqlType)); if (hintInfo != null) { session.executeHint(hintInfo, sqlType, sql, null); session.getService().controlTx(TransactionOperate.QUERY); diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java index be9ba2c0a..e3fb68384 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java @@ -302,7 +302,7 @@ public class RWSplitService extends BusinessService { if (isSuccess) { long statementId = ByteUtil.readUB4(resp, 5); int paramCount = ByteUtil.readUB2(resp, 11); - psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql)); + psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql, sqlType)); } }); } else { @@ -310,7 +310,7 @@ public class RWSplitService extends BusinessService { if (isSuccess) { long statementId = ByteUtil.readUB4(resp, 5); int paramCount = ByteUtil.readUB2(resp, 11); - psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, false, finalSql)); + psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, false, finalSql, sqlType)); } }); } @@ -319,7 +319,7 @@ public class RWSplitService extends BusinessService { if (isSuccess) { long statementId = ByteUtil.readUB4(resp, 5); int paramCount = ByteUtil.readUB2(resp, 11); - psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql)); + psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql, sqlType)); } }); } diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/handle/PreparedStatementHolder.java b/src/main/java/com/actiontech/dble/services/rwsplit/handle/PreparedStatementHolder.java index 65aa8ab27..037d0ba60 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/handle/PreparedStatementHolder.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/handle/PreparedStatementHolder.java @@ -14,12 +14,14 @@ public class PreparedStatementHolder { private byte[] fieldType; private boolean needAddFieldType; private String prepareSql; + private int sqlType; - public PreparedStatementHolder(byte[] prepareOrigin, int paramsCount, boolean mustMaster, String sql) { + public PreparedStatementHolder(byte[] prepareOrigin, int paramsCount, boolean mustMaster, String sql, int sqlType) { this.prepareOrigin = prepareOrigin; this.paramsCount = paramsCount; this.mustMaster = mustMaster; this.prepareSql = sql; + this.sqlType = sqlType; } public boolean isMustMaster() { @@ -63,4 +65,8 @@ public class PreparedStatementHolder { public String getPrepareSql() { return prepareSql; } + + public int getSqlType() { + return sqlType; + } } diff --git a/src/main/java/com/actiontech/dble/statistic/sql/StatisticManager.java b/src/main/java/com/actiontech/dble/statistic/sql/StatisticManager.java index 5ed61a0e4..6c50b54b3 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/StatisticManager.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/StatisticManager.java @@ -207,6 +207,10 @@ public final class StatisticManager { } } + public boolean isPureRecordSql() { + return !enable && (samplingRate > 0 || enableAnalysis); + } + public boolean isEnableAnalysis() { return enableAnalysis; } diff --git a/src/main/java/com/actiontech/dble/statistic/sql/analyzer/TableStat.java b/src/main/java/com/actiontech/dble/statistic/sql/analyzer/TableStat.java index d75fbfdde..cce9933e1 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/analyzer/TableStat.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/analyzer/TableStat.java @@ -46,8 +46,8 @@ public class TableStat implements Comparable { this.lastExecuteTime = 0; } - public void update(int sqlType, long endTime, List relationTables) { - switch (AbstractServerParse.getBusinessType(sqlType)) { + public void update(AbstractServerParse.BusinessType businessType, long endTime, List relationTables) { + switch (businessType) { case R: this.rCount.incrementAndGet(); break; diff --git a/src/main/java/com/actiontech/dble/statistic/sql/analyzer/TableStatAnalyzer.java b/src/main/java/com/actiontech/dble/statistic/sql/analyzer/TableStatAnalyzer.java index e6ce53862..686395131 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/analyzer/TableStatAnalyzer.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/analyzer/TableStatAnalyzer.java @@ -5,15 +5,10 @@ package com.actiontech.dble.statistic.sql.analyzer; +import com.actiontech.dble.server.parser.AbstractServerParse; +import com.actiontech.dble.services.manager.information.ManagerTableUtil; import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry; -import com.actiontech.dble.util.StringUtil; -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.ast.statement.*; -import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter; -import com.alibaba.druid.sql.parser.SQLParserUtils; -import com.alibaba.druid.sql.parser.SQLStatementParser; -import com.alibaba.druid.sql.visitor.SQLASTVisitorAdapter; -import com.alibaba.druid.util.JdbcConstants; +import com.actiontech.dble.util.CollectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,9 +28,6 @@ public final class TableStatAnalyzer implements AbstractAnalyzer { private Map tableStatMap = new ConcurrentHashMap<>(); private ReentrantLock lock = new ReentrantLock(); - //PARSER SQL TO GET NAME - private SQLParser sqlParser = new SQLParser(); - private static final TableStatAnalyzer INSTANCE = new TableStatAnalyzer(); private TableStatAnalyzer() { @@ -47,21 +39,27 @@ public final class TableStatAnalyzer implements AbstractAnalyzer { @Override public void toAnalyzing(StatisticFrontendSqlEntry fEntry) { - String masterTable = null; - List relationTables = new ArrayList<>(); - List tables = sqlParser.parseTableNames(fEntry.getSql()); - for (int i = 0; i < tables.size(); i++) { - String table = tables.get(i); - if (i == 0) { - masterTable = table; - } else { - relationTables.add(table); - } - } - - if (masterTable != null) { - TableStat tableStat = getTableStat(masterTable); - tableStat.update(fEntry.getSqlType(), fEntry.getEndTimeMs(), relationTables); + AbstractServerParse.BusinessType businessType = AbstractServerParse.getBusinessType(fEntry.getSqlType()); + switch (businessType) { + case R: + case W: + List tableList = new ArrayList<>(fEntry.getTables()); + if (CollectionUtil.isEmpty(tableList) || + (tableList.size() > 1 && tableList.size() != tableList.stream().distinct().count())) { + tableList = ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql()); + } + String masterTable = null; + if (tableList.size() >= 1) { + masterTable = tableList.get(0); + } + if (masterTable != null) { + tableList.remove(0); + TableStat tableStat = getTableStat(masterTable); + tableStat.update(businessType, fEntry.getEndTimeMs(), tableList); + } + break; + default: + break; } } @@ -97,81 +95,4 @@ public final class TableStatAnalyzer implements AbstractAnalyzer { tableStatMap.clear(); } - - /** - * PARSER table name - */ - private static class SQLParser { - - private SQLStatement parseStmt(String sql) { - SQLStatementParser statParser = SQLParserUtils.createSQLStatementParser(sql, "mysql"); - SQLStatement stmt = statParser.parseStatement(); - return stmt; - } - - /** - * fix SCHEMA,` - * - * @param tableName - * @return - */ - private String fixName(String tableName) { - if (tableName != null) { - tableName = tableName.replace("`", ""); - int dotIdx = tableName.indexOf("."); - if (dotIdx > 0) { - tableName = tableName.substring(1 + dotIdx).trim(); - } - } - return tableName; - } - - /** - * PARSER SQL table name - */ - public List parseTableNames(String sql) { - final List tables = new ArrayList<>(); - try { - SQLStatement stmt = parseStmt(sql); - if (stmt instanceof SQLReplaceStatement) { - String table = ((SQLReplaceStatement) stmt).getTableName().getSimpleName(); - tables.add(fixName(table)); - } else if (stmt instanceof SQLInsertStatement) { - String table = ((SQLInsertStatement) stmt).getTableName().getSimpleName(); - tables.add(fixName(table)); - } else if (stmt instanceof SQLUpdateStatement) { - addTableName(tables, stmt); - } else if (stmt instanceof SQLDeleteStatement) { - String table = ((SQLDeleteStatement) stmt).getTableName().getSimpleName(); - tables.add(fixName(table)); - } else if (stmt instanceof SQLSelectStatement) { - addTableName(tables, stmt); - } - } catch (Exception e) { - LOGGER.info("TableStatAnalyzer err:" + e.toString()); - } - return tables; - } - - private void addTableName(List tables, SQLStatement stmt) { - String dbType = stmt.getDbType().name(); - if (!StringUtil.isEmpty(dbType) && JdbcConstants.MYSQL.equals(dbType)) { - stmt.accept(new MySqlASTVisitorAdapter() { - public boolean visit(SQLExprTableSource x) { - tables.add(fixName(x.toString())); - return super.visit(x); - } - }); - - } else { - stmt.accept(new SQLASTVisitorAdapter() { - public boolean visit(SQLExprTableSource x) { - tables.add(fixName(x.toString())); - return super.visit(x); - } - }); - } - } - } - } diff --git a/src/main/java/com/actiontech/dble/statistic/sql/analyzer/UserStatAbstractAnalyzer.java b/src/main/java/com/actiontech/dble/statistic/sql/analyzer/UserStatAbstractAnalyzer.java index 8c58acccf..b1be7d0ac 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/analyzer/UserStatAbstractAnalyzer.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/analyzer/UserStatAbstractAnalyzer.java @@ -30,10 +30,10 @@ public final class UserStatAbstractAnalyzer implements AbstractAnalyzer { @Override public void toAnalyzing(final StatisticFrontendSqlEntry fEntry) { - UserStat newUserStat = new UserStat(fEntry.getFrontend()); - UserStat userStat = userStatMap.putIfAbsent(fEntry.getFrontend().getUser(), newUserStat); - if (userStat == null) { - userStat = newUserStat; + UserStat userStat; + if ((userStat = userStatMap.get(fEntry.getFrontend().getUser())) == null) { + userStat = new UserStat(fEntry.getFrontend()); + userStatMap.put(fEntry.getFrontend().getUser(), userStat); } userStat.update(fEntry); } diff --git a/src/main/java/com/actiontech/dble/statistic/sql/entry/BackendInfo.java b/src/main/java/com/actiontech/dble/statistic/sql/entry/BackendInfo.java index c219889d9..1c5d02a28 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/entry/BackendInfo.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/entry/BackendInfo.java @@ -7,13 +7,11 @@ public class BackendInfo { String name; // db_instance String host; int port; - String node; // sharding_node - public BackendInfo(BackendConnection bConn, String node) { + public BackendInfo(BackendConnection bConn) { this.name = ((MySQLInstance) bConn.getInstance()).getName(); this.host = bConn.getHost(); this.port = bConn.getPort(); - this.node = node; } public String getName() { @@ -27,8 +25,4 @@ public class BackendInfo { public int getPort() { return port; } - - public String getNode() { - return node; - } } diff --git a/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticBackendSqlEntry.java b/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticBackendSqlEntry.java index 1205b140b..172e7a66a 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticBackendSqlEntry.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticBackendSqlEntry.java @@ -8,13 +8,15 @@ package com.actiontech.dble.statistic.sql.entry; public final class StatisticBackendSqlEntry extends StatisticEntry { private BackendInfo backend; private boolean isNeedToTx; + private String node; // sharding_node public StatisticBackendSqlEntry( FrontendInfo frontendInfo, - BackendInfo backendInfo, long startTime, + BackendInfo backendInfo, String node, long startTime, String sql, int sqlType, long rows, long endTime) { - super(frontendInfo, startTime, sql, rows, endTime); + super(frontendInfo, startTime, sql, sqlType, rows, endTime); this.backend = backendInfo; + this.node = node; this.sqlType = sqlType; } @@ -22,6 +24,10 @@ public final class StatisticBackendSqlEntry extends StatisticEntry { return backend; } + public String getNode() { + return node; + } + public String getSql() { return sql; } @@ -35,14 +41,14 @@ public final class StatisticBackendSqlEntry extends StatisticEntry { } public String getKey() { - StringBuffer key = new StringBuffer(); + StringBuilder key = new StringBuilder(); key.append(getFrontend().getUserId()); key.append(":"); key.append(getFrontend().getUser()); key.append(":"); key.append(getFrontend().getHost()); key.append("|"); - key.append(getBackend().getNode()); + key.append(getNode()); key.append(":"); key.append(getBackend().getName()); key.append(":"); diff --git a/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticEntry.java b/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticEntry.java index e5940eb6c..dca2ee9a4 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticEntry.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticEntry.java @@ -11,13 +11,14 @@ public class StatisticEntry { private FrontendInfo frontend; protected long rows; protected String sql; - protected int sqlType = -99; + protected int sqlType; protected long duration; public StatisticEntry(FrontendInfo frontendInfo, long startTime, - String sql, long rows, long endTime) { + String sql, int sqlType, long rows, long endTime) { this.frontend = frontendInfo; - this.sql = sql.replaceAll("[\\t\\n\\r]", " ").trim(); + this.sql = sql; + this.sqlType = sqlType; this.rows = rows; this.duration = endTime - startTime; } @@ -35,10 +36,7 @@ public class StatisticEntry { } public int getSqlType() { - if (null == sql) { - return sqlType; - } - if (sqlType == -99) { + if (sqlType == -99 && sql != null) { this.sqlType = ServerParseFactory.getShardingParser().parse(sql) & 0xff; } return sqlType; diff --git a/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticFrontendSqlEntry.java b/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticFrontendSqlEntry.java index b2eec0d11..59641e5ff 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticFrontendSqlEntry.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/entry/StatisticFrontendSqlEntry.java @@ -5,6 +5,8 @@ package com.actiontech.dble.statistic.sql.entry; +import java.util.ArrayList; + public class StatisticFrontendSqlEntry extends StatisticEntry { private String schema; @@ -14,11 +16,12 @@ public class StatisticFrontendSqlEntry extends StatisticEntry { private long resultSize; private long startTimeMs; private long endTimeMs; + private ArrayList tableList; public StatisticFrontendSqlEntry(FrontendInfo frontendInfo, long startTime, long startTimeMs, - String schema, String sql, long txId, long examinedRows, long rows, - long netOutBytes, long resultSize, long endTime, long endTimeMs) { - super(frontendInfo, startTime, sql, rows, endTime); + String schema, String sql, int sqlType, long txId, long examinedRows, long rows, + long netOutBytes, long resultSize, long endTime, long endTimeMs, ArrayList tableList) { + super(frontendInfo, startTime, sql, sqlType, rows, endTime); this.schema = schema; this.txId = txId; this.examinedRows = examinedRows; @@ -26,6 +29,7 @@ public class StatisticFrontendSqlEntry extends StatisticEntry { this.resultSize = resultSize; this.startTimeMs = startTimeMs; this.endTimeMs = endTimeMs; + this.tableList = tableList; } public long getStartTimeMs() { @@ -59,4 +63,8 @@ public class StatisticFrontendSqlEntry extends StatisticEntry { public long getResultSize() { return resultSize; } + + public ArrayList getTables() { + return tableList; + } } diff --git a/src/main/java/com/actiontech/dble/statistic/sql/handler/AnalysisHandler.java b/src/main/java/com/actiontech/dble/statistic/sql/handler/AnalysisHandler.java index 2aeb241f7..ceb6c02d8 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/handler/AnalysisHandler.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/handler/AnalysisHandler.java @@ -35,7 +35,10 @@ public class AnalysisHandler implements StatisticDataHandler { if (!StatisticManager.getInstance().isEnableAnalysis()) { return; } - StatisticEntry entry = statisticEvent.getEntry(); + handle(statisticEvent.getEntry()); + } + + public void handle(StatisticEntry entry) { if (entry instanceof StatisticFrontendSqlEntry) { StatisticFrontendSqlEntry frontendSqlEntry = (StatisticFrontendSqlEntry) entry; for (AbstractAnalyzer listener : listeners) { diff --git a/src/main/java/com/actiontech/dble/statistic/sql/handler/AssociateTablesByEntryByUserCalcHandler.java b/src/main/java/com/actiontech/dble/statistic/sql/handler/AssociateTablesByEntryByUserCalcHandler.java index ff61da2af..7702a99ac 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/handler/AssociateTablesByEntryByUserCalcHandler.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/handler/AssociateTablesByEntryByUserCalcHandler.java @@ -11,6 +11,7 @@ import com.actiontech.dble.statistic.sql.StatisticManager; import com.actiontech.dble.statistic.sql.entry.FrontendInfo; import com.actiontech.dble.statistic.sql.entry.StatisticEntry; import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry; +import com.actiontech.dble.util.CollectionUtil; import java.util.*; @@ -43,7 +44,11 @@ public class AssociateTablesByEntryByUserCalcHandler implements StatisticDataHan if (entry instanceof StatisticFrontendSqlEntry) { StatisticFrontendSqlEntry fEntry = ((StatisticFrontendSqlEntry) entry); if (fEntry.getSqlType() == 7) { - List tableList = ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql()); + List tableList = new ArrayList<>(fEntry.getTables()); + if (CollectionUtil.isEmpty(tableList) || + (tableList.size() > 1 && tableList.size() != tableList.stream().distinct().count())) { + tableList = ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql()); + } if (!tableList.isEmpty() && tableList.size() > 1) { Collections.sort(tableList); String tables = String.join(",", tableList); diff --git a/src/main/java/com/actiontech/dble/statistic/sql/handler/FrontendByBackendByEntryByUserCalcHandler.java b/src/main/java/com/actiontech/dble/statistic/sql/handler/FrontendByBackendByEntryByUserCalcHandler.java index e27d7a41c..797460587 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/handler/FrontendByBackendByEntryByUserCalcHandler.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/handler/FrontendByBackendByEntryByUserCalcHandler.java @@ -51,10 +51,11 @@ public class FrontendByBackendByEntryByUserCalcHandler implements StatisticDataH boolean isNew = currRecord == null; if (isNew) { checkEliminate(); - currRecord = new Record(entry.getFrontend().getUserId(), entry.getFrontend(), backendSqlEntry.getBackend()); + currRecord = new Record(entry.getFrontend().getUserId(), entry.getFrontend(), backendSqlEntry.getBackend(), backendSqlEntry.getNode()); } - if (backendSqlEntry.getSqlType() == 4 || backendSqlEntry.getSqlType() == 11 || backendSqlEntry.getSqlType() == 3 || backendSqlEntry.getSqlType() == 7) { - switch (backendSqlEntry.getSqlType()) { + int sqlType = backendSqlEntry.getSqlType(); + if (sqlType == 4 || sqlType == 11 || sqlType == 3 || sqlType == 7) { + switch (sqlType) { case ServerParse.INSERT: currRecord.addInsert(backendSqlEntry.getRows(), backendSqlEntry.getDuration()); break; @@ -102,6 +103,7 @@ public class FrontendByBackendByEntryByUserCalcHandler implements StatisticDataH int entry; FrontendInfo frontend; BackendInfo backend; + String node; int txCount = 0; long txRows = 0L; @@ -125,10 +127,11 @@ public class FrontendByBackendByEntryByUserCalcHandler implements StatisticDataH long lastUpdateTime = 0L; - public Record(int entry, FrontendInfo frontend, BackendInfo backend) { + public Record(int entry, FrontendInfo frontend, BackendInfo backend, String node) { this.entry = entry; this.frontend = frontend; this.backend = backend; + this.node = node; } public void incrementTx() { @@ -185,6 +188,10 @@ public class FrontendByBackendByEntryByUserCalcHandler implements StatisticDataH return backend; } + public String getNode() { + return node; + } + public long getUpdateTime() { return updateTime; } diff --git a/src/main/java/com/actiontech/dble/statistic/sql/handler/SqlStatisticHandler.java b/src/main/java/com/actiontech/dble/statistic/sql/handler/SqlStatisticHandler.java index ab94a5fa8..ca5f11fce 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/handler/SqlStatisticHandler.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/handler/SqlStatisticHandler.java @@ -16,11 +16,13 @@ import com.alibaba.druid.sql.visitor.ParameterizedOutputVisitorUtils; import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class SqlStatisticHandler implements StatisticDataHandler { private final ConcurrentSkipListMap txRecords = new ConcurrentSkipListMap<>(); + private AtomicLong txRecordSize = new AtomicLong(0); private volatile BitSet sampleDecisions; public SqlStatisticHandler() { @@ -32,16 +34,20 @@ public class SqlStatisticHandler implements StatisticDataHandler { if (StatisticManager.getInstance().getSamplingRate() == 0) { return; } + handle(statisticEvent.getEntry()); + } - StatisticEntry entry = statisticEvent.getEntry(); + public void handle(StatisticEntry entry) { if (entry instanceof StatisticFrontendSqlEntry) { StatisticFrontendSqlEntry frontendSqlEntry = (StatisticFrontendSqlEntry) entry; if (sampleDecisions.get((int) (frontendSqlEntry.getTxId() % 100))) { if (null == txRecords.get(frontendSqlEntry.getTxId())) { - if (txRecords.size() >= StatisticManager.getInstance().getSqlLogSize()) { + if (txRecordSize.intValue() >= StatisticManager.getInstance().getSqlLogSize()) { txRecords.pollFirstEntry(); + txRecordSize.decrementAndGet(); } txRecords.put(frontendSqlEntry.getTxId(), new TxRecord(frontendSqlEntry)); + txRecordSize.incrementAndGet(); checkEliminate(); } else { txRecords.get(frontendSqlEntry.getTxId()).getSqls().add(new SQLRecord(frontendSqlEntry)); @@ -52,9 +58,10 @@ public class SqlStatisticHandler implements StatisticDataHandler { private void checkEliminate() { int removeIndex; - if ((removeIndex = txRecords.size() - StatisticManager.getInstance().getSqlLogSize()) > 0) { + if ((removeIndex = txRecordSize.intValue() - StatisticManager.getInstance().getSqlLogSize()) > 0) { while (removeIndex-- > 0) { txRecords.pollFirstEntry(); + txRecordSize.decrementAndGet(); } } } @@ -68,6 +75,7 @@ public class SqlStatisticHandler implements StatisticDataHandler { @Override public void clear() { txRecords.clear(); + txRecordSize.set(0); } private BitSet randomBitSet(int cardinality, Random rnd) { @@ -149,18 +157,11 @@ public class SqlStatisticHandler implements StatisticDataHandler { private long duration; private long startTime; + private AtomicBoolean init = new AtomicBoolean(false); + public SQLRecord(StatisticFrontendSqlEntry entry) { this.sqlId = SQL_ID_GENERATOR.incrementAndGet(); this.stmt = entry.getSql(); - if (stmt.equalsIgnoreCase("begin")) { - this.sqlDigest = "begin"; - } else { - try { - this.sqlDigest = ParameterizedOutputVisitorUtils.parameterize(this.stmt, DbType.mysql).replaceAll("[\\t\\n\\r]", " "); - } catch (RuntimeException e) { - this.sqlDigest = "Other"; - } - } this.sqlType = entry.getSqlType(); this.txId = entry.getTxId(); @@ -228,6 +229,18 @@ public class SqlStatisticHandler implements StatisticDataHandler { } public String getSqlDigest() { + if (init.compareAndSet(false, true)) { + try { + if (stmt.equalsIgnoreCase("begin")) { + this.sqlDigest = "begin"; + } else { + String tmpStmt = ParameterizedOutputVisitorUtils.parameterize(this.stmt, DbType.mysql); + this.sqlDigest = tmpStmt.replaceAll("[\\t\\n\\r]", " "); + } + } catch (RuntimeException e) { + this.sqlDigest = "Other"; + } + } return sqlDigest; } diff --git a/src/main/java/com/actiontech/dble/statistic/sql/handler/TableByUserByEntryCalcHandler.java b/src/main/java/com/actiontech/dble/statistic/sql/handler/TableByUserByEntryCalcHandler.java index 4924f1bbe..57f877f95 100644 --- a/src/main/java/com/actiontech/dble/statistic/sql/handler/TableByUserByEntryCalcHandler.java +++ b/src/main/java/com/actiontech/dble/statistic/sql/handler/TableByUserByEntryCalcHandler.java @@ -12,6 +12,7 @@ import com.actiontech.dble.statistic.sql.StatisticManager; import com.actiontech.dble.statistic.sql.entry.FrontendInfo; import com.actiontech.dble.statistic.sql.entry.StatisticEntry; import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry; +import com.actiontech.dble.util.CollectionUtil; import java.util.*; @@ -43,48 +44,60 @@ public class TableByUserByEntryCalcHandler implements StatisticDataHandler { synchronized (records) { if (entry instanceof StatisticFrontendSqlEntry) { StatisticFrontendSqlEntry fEntry = ((StatisticFrontendSqlEntry) entry); - Set tableSet = new HashSet<>(ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql())); - if (tableSet.isEmpty()) { - // dual, no table - toRecord("null", fEntry); - } else { - for (String t : tableSet) { - toRecord(t, fEntry); + int sqlType = fEntry.getSqlType(); + if (sqlType == 4 || sqlType == 11 || sqlType == 3 || sqlType == 7) { + List tableList = new ArrayList<>(fEntry.getTables()); + if (CollectionUtil.isEmpty(tableList) || + (tableList.size() > 1 && tableList.size() != tableList.stream().distinct().count())) { + tableList = ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql()); + } + if (tableList.isEmpty()) { + // dual, no table + toRecord("null", fEntry, sqlType); + } else { + if (sqlType != 7 && tableList.size() > 1) { + toRecord(tableList.get(0), fEntry, sqlType); + } else { + Set tableSet = new HashSet<>(); + tableSet.addAll(tableList); + for (String t : tableSet) { + toRecord(t, fEntry, sqlType); + } + } + } } } } } - private void toRecord(String table, StatisticFrontendSqlEntry fEntry) { - if (fEntry.getSqlType() == 4 || fEntry.getSqlType() == 11 || fEntry.getSqlType() == 3 || fEntry.getSqlType() == 7) { - String key = fEntry.getFrontend().getUserId() + "-" + fEntry.getFrontend().getUser() + "-" + table; - Record currRecord = records.get(key); - boolean isNew = currRecord == null; - if (isNew) { - checkEliminate(); - currRecord = new Record(fEntry.getFrontend().getUserId(), fEntry.getFrontend(), table); - } - switch (fEntry.getSqlType()) { - case ServerParse.INSERT: - currRecord.addInsert(fEntry.getRows(), fEntry.getDuration()); - break; - case ServerParse.UPDATE: - currRecord.addUpdate(fEntry.getRows(), fEntry.getDuration()); - break; - case ServerParse.DELETE: - currRecord.addDelete(fEntry.getRows(), fEntry.getDuration()); - break; - case ServerParse.SELECT: - currRecord.addSelect(fEntry.getExaminedRows(), fEntry.getRows(), fEntry.getDuration()); - break; - default: - // ignore - break; - } - if (isNew) { - records.put(key, currRecord); - } + private void toRecord(String table, StatisticFrontendSqlEntry fEntry, int sqlType) { + String key = (new StringBuilder().append(fEntry.getFrontend().getUserId()).append("-").append(fEntry.getFrontend().getUser()).append("-").append(table)).toString(); + Record currRecord = records.get(key); + boolean isNew = currRecord == null; + if (isNew) { + checkEliminate(); + currRecord = new Record(fEntry.getFrontend().getUserId(), fEntry.getFrontend(), table); + } + switch (sqlType) { + case ServerParse.INSERT: + currRecord.addInsert(fEntry.getRows(), fEntry.getDuration()); + break; + case ServerParse.UPDATE: + currRecord.addUpdate(fEntry.getRows(), fEntry.getDuration()); + break; + case ServerParse.DELETE: + currRecord.addDelete(fEntry.getRows(), fEntry.getDuration()); + break; + case ServerParse.SELECT: + currRecord.addSelect(fEntry.getExaminedRows(), fEntry.getRows(), fEntry.getDuration()); + break; + default: + // ignore + break; + } + if (isNew) { + records.put(key, currRecord); } } diff --git a/src/main/java/com/actiontech/dble/statistic/stat/QueryTimeCost.java b/src/main/java/com/actiontech/dble/statistic/stat/QueryTimeCost.java index f5a15fef4..6a0281478 100644 --- a/src/main/java/com/actiontech/dble/statistic/stat/QueryTimeCost.java +++ b/src/main/java/com/actiontech/dble/statistic/stat/QueryTimeCost.java @@ -20,8 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger; public class QueryTimeCost implements Cloneable { public static final Logger LOGGER = LoggerFactory.getLogger(QueryTimeCost.class); - private final CostTimeProvider provider; - private final ComplexQueryProvider xProvider; + private volatile CostTimeProvider provider; + private volatile ComplexQueryProvider xProvider; private long connId = 0; private long requestTime = 0; @@ -38,11 +38,11 @@ public class QueryTimeCost implements Cloneable { public QueryTimeCost(long connId) { this.connId = connId; - this.xProvider = new ComplexQueryProvider(); - this.provider = new CostTimeProvider(); } public void setRequestTime(long requestTime) { + if (this.xProvider == null) this.xProvider = new ComplexQueryProvider(); + if (this.provider == null) this.provider = new CostTimeProvider(); reset(); this.requestTime = requestTime; provider.beginRequest(connId); diff --git a/src/main/java/com/actiontech/dble/statistic/trace/AbstractTrackProbe.java b/src/main/java/com/actiontech/dble/statistic/trace/AbstractTrackProbe.java index 46e2287c1..fa9852825 100644 --- a/src/main/java/com/actiontech/dble/statistic/trace/AbstractTrackProbe.java +++ b/src/main/java/com/actiontech/dble/statistic/trace/AbstractTrackProbe.java @@ -6,12 +6,14 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler; import com.actiontech.dble.net.service.AbstractService; import com.actiontech.dble.route.RouteResultset; +import com.actiontech.dble.route.parser.util.Pair; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.services.mysqlsharding.ShardingService; import com.actiontech.dble.services.rwsplit.RWSplitService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.function.Consumer; public abstract class AbstractTrackProbe { @@ -23,7 +25,10 @@ public abstract class AbstractTrackProbe { public void startProcess() { } - public void setQuery(String sql) { + public void setQuery(String sql, int sqlType) { + } + + public void addTable(List> tables) { } public void endParse() { diff --git a/src/main/java/com/actiontech/dble/statistic/trace/RwTraceResult.java b/src/main/java/com/actiontech/dble/statistic/trace/RwTraceResult.java index 2f132feea..5986136d0 100644 --- a/src/main/java/com/actiontech/dble/statistic/trace/RwTraceResult.java +++ b/src/main/java/com/actiontech/dble/statistic/trace/RwTraceResult.java @@ -4,16 +4,14 @@ import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.statistic.sql.StatisticManager; -import com.actiontech.dble.statistic.sql.entry.BackendInfo; -import com.actiontech.dble.statistic.sql.entry.FrontendInfo; import com.actiontech.dble.statistic.sql.entry.StatisticBackendSqlEntry; import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; public class RwTraceResult implements Cloneable { @@ -24,24 +22,26 @@ public class RwTraceResult implements Cloneable { protected long requestEnd; protected long requestEndMs; protected String sql; + protected int sqlType = -1; protected String schema; protected long sqlRows; protected volatile long examinedRows; protected long netOutBytes; protected long resultSize; - protected FrontendInfo frontendInfo; final RWSplitNonBlockingSession currentSession; protected List actualRouteList = Lists.newCopyOnWriteArrayList(); protected volatile long previousTxId = 0; + // only samplingRate=100 + protected boolean pureRecordSql = true; public RwTraceResult(RWSplitNonBlockingSession currentSession) { this.currentSession = currentSession; - this.frontendInfo = new FrontendInfo(currentSession.getService()); } public void setRequestTime(long time, long timeMs) { reset(); + this.pureRecordSql = StatisticManager.getInstance().isPureRecordSql(); this.requestStart = time; this.requestStartMs = timeMs; } @@ -50,14 +50,20 @@ public class RwTraceResult implements Cloneable { this.parseStart = time; } - public void setQuery(String sql0) { + public void setQuery(String sql0, int sqlType0) { this.schema = currentSession.getService().getSchema(); this.sql = sql0; + this.sqlType = sqlType0; // multi-query if (currentSession.getIsMultiStatement().get() && currentSession.getMultiQueryHandler() != null) { - Optional find = actualRouteList.stream().filter(f -> (f.handler == currentSession.getMultiQueryHandler())).findFirst(); - if (find.isPresent()) { - ActualRoute ar = find.get(); + ActualRoute ar = null; + for (ActualRoute a : actualRouteList) { + if (a.handler == currentSession.getMultiQueryHandler()) { + ar = a; + break; + } + } + if (ar != null) { ar.setSql(sql0); ar.setRow(0); ar.setFinished(0); @@ -70,9 +76,15 @@ public class RwTraceResult implements Cloneable { public void setBackendRequestTime(MySQLResponseService service, long time) { final ResponseHandler responseHandler = service.getResponseHandler(); if (responseHandler != null && sql != null) { - Optional find = actualRouteList.stream().filter(f -> (f.handler == responseHandler)).findFirst(); - if (!find.isPresent()) { - ActualRoute ar = new ActualRoute(responseHandler, sql, time); + ActualRoute ar = null; + for (ActualRoute a : actualRouteList) { + if (a.handler == responseHandler) { + ar = a; + break; + } + } + if (ar == null) { + ar = new ActualRoute(responseHandler, sql, time); actualRouteList.add(ar); } } @@ -82,9 +94,14 @@ public class RwTraceResult implements Cloneable { public void setBackendSqlAddRows(MySQLResponseService service, Long num) { final ResponseHandler responseHandler = service.getResponseHandler(); if (responseHandler != null && sql != null) { - Optional find = actualRouteList.stream().filter(f -> (f.handler == responseHandler)).findFirst(); - if (find.isPresent()) { - ActualRoute ar = find.get(); + ActualRoute ar = null; + for (ActualRoute a : actualRouteList) { + if (a.handler == responseHandler) { + ar = a; + break; + } + } + if (ar != null) { if (num == null) { ar.addRow(); } else { @@ -97,16 +114,21 @@ public class RwTraceResult implements Cloneable { public void setBackendResponseEndTime(MySQLResponseService service, long time) { ResponseHandler responseHandler = service.getResponseHandler(); if (responseHandler != null && sql != null) { - Optional find = actualRouteList.stream().filter(f -> (f.handler == responseHandler && f.finished == 0)).findFirst(); - if (find.isPresent()) { - ActualRoute ar = find.get(); + ActualRoute ar = null; + for (ActualRoute a : actualRouteList) { + if (a.handler == responseHandler && a.finished == 0) { + ar = a; + break; + } + } + if (ar != null) { ar.setFinished(time); - examinedRows += ar.getRow(); + if (pureRecordSql) return; StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry( - frontendInfo, - new BackendInfo(service.getConnection(), "-"), - ar.getRequestTime(), ar.getSql(), -99, ar.getRow(), ar.getFinished()); + currentSession.getTraceFrontendInfo(), + service.getConnection().getTraceBackendInfo(), "-", + ar.getRequestTime(), ar.getSql(), sqlType, ar.getRow(), ar.getFinished()); bEntry.setNeedToTx(isNeedToTx()); StatisticManager.getInstance().push(bEntry); } @@ -114,10 +136,11 @@ public class RwTraceResult implements Cloneable { } public void setBackendResponseTxEnd(MySQLResponseService service, long time) { + if (pureRecordSql) return; if (!isNeedToTx()) { StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry( - new FrontendInfo(currentSession.getService()), - new BackendInfo(service.getConnection(), "-"), + currentSession.getTraceFrontendInfo(), + service.getConnection().getTraceBackendInfo(), "-", time, "/** txEnd **/", 0, 0, time); bEntry.setNeedToTx(true); StatisticManager.getInstance().push(bEntry); @@ -135,9 +158,9 @@ public class RwTraceResult implements Cloneable { this.requestEnd = time; this.requestEndMs = timeMs; if (this.isCompletedV1() && isSuccess) { - StatisticFrontendSqlEntry f = new StatisticFrontendSqlEntry(frontendInfo, requestStart, requestStartMs, - schema, sql, currentSession.getService().getTxId(), examinedRows, sqlRows, - netOutBytes, resultSize, requestEnd, requestEndMs); + StatisticFrontendSqlEntry f = new StatisticFrontendSqlEntry(currentSession.getTraceFrontendInfo(), requestStart, requestStartMs, + schema, sql, sqlType, currentSession.getService().getTxId(), examinedRows, sqlRows, + netOutBytes, resultSize, requestEnd, requestEndMs, new ArrayList<>()); StatisticManager.getInstance().push(f); } } @@ -145,7 +168,6 @@ public class RwTraceResult implements Cloneable { public void setExit() { reset(); - frontendInfo = null; } public boolean isNeedToTx() { @@ -170,6 +192,7 @@ public class RwTraceResult implements Cloneable { requestEnd = 0; requestEndMs = 0; sql = null; + sqlType = -1; schema = null; sqlRows = 0; examinedRows = 0; diff --git a/src/main/java/com/actiontech/dble/statistic/trace/RwTrackProbe.java b/src/main/java/com/actiontech/dble/statistic/trace/RwTrackProbe.java index 063196e36..97e9a22ab 100644 --- a/src/main/java/com/actiontech/dble/statistic/trace/RwTrackProbe.java +++ b/src/main/java/com/actiontech/dble/statistic/trace/RwTrackProbe.java @@ -30,8 +30,8 @@ public class RwTrackProbe extends AbstractTrackProbe { sqlTracking(t -> t.startProcess(System.nanoTime())); } - public void setQuery(String sql) { - sqlTracking(t -> t.setQuery(sql)); + public void setQuery(String sql, int sqlType) { + sqlTracking(t -> t.setQuery(sql, sqlType)); } public void setBackendRequestTime(MySQLResponseService service) { diff --git a/src/main/java/com/actiontech/dble/statistic/trace/TraceResult.java b/src/main/java/com/actiontech/dble/statistic/trace/TraceResult.java index 6864a410a..74186a2c4 100644 --- a/src/main/java/com/actiontech/dble/statistic/trace/TraceResult.java +++ b/src/main/java/com/actiontech/dble/statistic/trace/TraceResult.java @@ -11,13 +11,11 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler; import com.actiontech.dble.route.RouteResultsetNode; +import com.actiontech.dble.route.parser.util.Pair; import com.actiontech.dble.server.NonBlockingSession; import com.actiontech.dble.server.status.SlowQueryLog; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; -import com.actiontech.dble.services.mysqlsharding.ShardingService; import com.actiontech.dble.statistic.sql.StatisticManager; -import com.actiontech.dble.statistic.sql.entry.BackendInfo; -import com.actiontech.dble.statistic.sql.entry.FrontendInfo; import com.actiontech.dble.statistic.sql.entry.StatisticBackendSqlEntry; import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry; import com.google.common.collect.Lists; @@ -25,9 +23,9 @@ import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; @@ -60,12 +58,13 @@ public class TraceResult implements Cloneable { protected boolean subQuery = false; protected String sql; + protected int sqlType = -1; protected String schema; + protected ArrayList tableList = new ArrayList<>(); protected long sqlRows = 0; protected long netOutBytes; protected long resultSize; protected TraceResult previous = null; - protected FrontendInfo frontendInfo; protected NonBlockingSession currentSession; /* * when 'set trace = 1' or 'enableSlowLog==true', need to record the time spent in each phase; @@ -77,17 +76,18 @@ public class TraceResult implements Cloneable { * */ protected boolean isDetailTrace = false; + // only samplingRate=100 + protected boolean pureRecordSql = true; public TraceResult(NonBlockingSession session0) { this.currentSession = session0; - ShardingService shardingService = currentSession.getShardingService(); - this.frontendInfo = new FrontendInfo(shardingService); } public void setRequestTime(long time, long timeMs) { copyToPrevious(); reset(); this.isDetailTrace = currentSession.isTraceEnable() || SlowQueryLog.getInstance().isEnableSlowLog(); + this.pureRecordSql = !isDetailTrace && StatisticManager.getInstance().isPureRecordSql(); this.requestStart = time; this.requestStartMs = timeMs; } @@ -98,9 +98,16 @@ public class TraceResult implements Cloneable { this.parseStart = time; } - public void setQuery(String sql0) { + public void setQuery(String sql0, int sqlType0) { this.schema = currentSession.getShardingService().getSchema(); this.sql = sql0; + this.sqlType = sqlType0; + } + + public void addTable(List> tables) { // schema.table + for (Pair p : tables) { + tableList.add(p.getKey() + "." + p.getValue()); + } } public void endParse(long time) { @@ -130,10 +137,16 @@ public class TraceResult implements Cloneable { final ResponseHandler responseHandler = service.getResponseHandler(); if (responseHandler != null) { RouteResultsetNode node = (RouteResultsetNode) service.getAttachment(); - String key = service.getConnection().getId() + ":" + node.getName() + ":" + +node.getStatementHash(); - Optional find = backendRouteList.stream().filter(f -> (f.handler == responseHandler && f.routeKey.equals(key))).findFirst(); - if (!find.isPresent()) { - BackendRoute ar = new BackendRoute(responseHandler, key, node.getName(), node.getStatement(), time); + String key = service.getTraceRouteKey(); + BackendRoute ar = null; + for (BackendRoute b : backendRouteList) { + if (b.handler == responseHandler && b.routeKey.equals(key)) { + ar = b; + break; + } + } + if (ar == null) { + ar = new BackendRoute(responseHandler, key, node.getName(), node.getStatement(), time); backendRouteList.add(ar); } } @@ -142,11 +155,15 @@ public class TraceResult implements Cloneable { public void setBackendResponseTime(MySQLResponseService service, long time) { final ResponseHandler responseHandler = service.getResponseHandler(); if (responseHandler != null) { - RouteResultsetNode node = (RouteResultsetNode) service.getAttachment(); - String key = service.getConnection().getId() + ":" + node.getName() + ":" + node.getStatementHash(); - Optional find = backendRouteList.stream().filter(f -> (f.handler == responseHandler && f.routeKey.equals(key) && f.firstRevTime == 0)).findFirst(); - if (find.isPresent()) { - BackendRoute ar = find.get(); + String key = service.getTraceRouteKey(); + BackendRoute ar = null; + for (BackendRoute b : backendRouteList) { + if (b.handler == responseHandler && b.routeKey.equals(key) && b.firstRevTime == 0) { + ar = b; + break; + } + } + if (ar != null) { ar.setFirstRevTime(time); ar.setMysqlResponseService(service); } @@ -156,11 +173,15 @@ public class TraceResult implements Cloneable { public void setBackendSqlAddRows(MySQLResponseService service, Long num) { final ResponseHandler responseHandler = service.getResponseHandler(); if (responseHandler != null) { - RouteResultsetNode node = (RouteResultsetNode) service.getAttachment(); - String key = service.getConnection().getId() + ":" + node.getName() + ":" + node.getStatementHash(); - Optional find = backendRouteList.stream().filter(f -> (f.handler == responseHandler && f.routeKey.equals(key) && f.firstRevTime != 0)).findFirst(); - if (find.isPresent()) { - BackendRoute ar = find.get(); + String key = service.getTraceRouteKey(); + BackendRoute ar = null; + for (BackendRoute b : backendRouteList) { + if (b.handler == responseHandler && b.routeKey.equals(key) && b.firstRevTime != 0) { + ar = b; + break; + } + } + if (ar != null) { if (num == null) { ar.getRow().incrementAndGet(); } else { @@ -174,15 +195,21 @@ public class TraceResult implements Cloneable { ResponseHandler responseHandler = service.getResponseHandler(); if (responseHandler != null) { RouteResultsetNode node = (RouteResultsetNode) service.getAttachment(); - String key = service.getConnection().getId() + ":" + node.getName() + ":" + node.getStatementHash(); - Optional find = backendRouteList.stream().filter(f -> (f.handler == responseHandler && f.routeKey.equals(key) && f.firstRevTime != 0 && f.finished == 0)).findFirst(); - if (find.isPresent()) { - BackendRoute ar = find.get(); + String key = service.getTraceRouteKey(); + BackendRoute ar = null; + for (BackendRoute b : backendRouteList) { + if (b.handler == responseHandler && b.routeKey.equals(key) && b.firstRevTime != 0 && b.finished == 0) { + ar = b; + break; + } + } + if (ar != null) { ar.setFinished(time); ar.setAutocommit(service.isAutocommit()); + if (pureRecordSql) return; StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry( - frontendInfo, - new BackendInfo(service.getConnection(), node.getName()), + currentSession.getTraceFrontendInfo(), + service.getConnection().getTraceBackendInfo(), node.getName(), ar.getRequestTime(), ar.getSql(), node.getSqlType(), ar.getRow().get(), ar.getFinished()); bEntry.setNeedToTx(ar.isAutocommit()); StatisticManager.getInstance().push(bEntry); @@ -192,18 +219,24 @@ public class TraceResult implements Cloneable { public void setBackendTerminateByComplex(MultiNodeMergeHandler mergeHandler, long time) { for (BaseDMLHandler handler : mergeHandler.getExeHandlers()) { - Optional find = backendRouteList.stream().filter(f -> (f.handler == handler && f.firstRevTime != 0 && f.finished == 0)).findFirst(); - if (find.isPresent()) { - BackendRoute ar = find.get(); + BackendRoute ar = null; + for (BackendRoute b : backendRouteList) { + if (b.handler == handler && b.firstRevTime != 0 && b.finished == 0) { + ar = b; + break; + } + } + if (ar != null) { ar.setFinished(time); + if (pureRecordSql) return; MySQLResponseService service; if ((service = ar.getMysqlResponseService()) != null) { RouteResultsetNode node = (RouteResultsetNode) service.getAttachment(); if (node != null) { ar.setAutocommit(service.isAutocommit()); StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry( - frontendInfo, - new BackendInfo(service.getConnection(), node.getName()), + currentSession.getTraceFrontendInfo(), + service.getConnection().getTraceBackendInfo(), node.getName(), ar.getRequestTime(), ar.getSql(), node.getSqlType(), ar.getRow().get(), ar.getFinished()); bEntry.setNeedToTx(ar.isAutocommit()); StatisticManager.getInstance().push(bEntry); @@ -215,11 +248,12 @@ public class TraceResult implements Cloneable { // commit、rollback、quit public void setBackendResponseTxEnd(MySQLResponseService service, long time) { + if (pureRecordSql) return; if (!service.isAutocommit()) { RouteResultsetNode node = (RouteResultsetNode) service.getAttachment(); StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry( - new FrontendInfo(currentSession.getShardingService()), - new BackendInfo(service.getConnection(), node.getName()), + currentSession.getTraceFrontendInfo(), + service.getConnection().getTraceBackendInfo(), node.getName(), time, "/** txEnd **/", 0, 0, time); bEntry.setNeedToTx(true); StatisticManager.getInstance().push(bEntry); @@ -245,10 +279,10 @@ public class TraceResult implements Cloneable { this.requestEnd = time; this.requestEndMs = timeMs; if (this.isCompletedV1() && isSuccess) { - long examinedRows = backendRouteList.stream().filter(f -> f.finished != 0).mapToLong(m -> m.getRow().get()).sum(); - StatisticFrontendSqlEntry f = new StatisticFrontendSqlEntry(frontendInfo, requestStart, requestStartMs, - schema, sql, currentSession.getShardingService().getTxId(), examinedRows, sqlRows, - netOutBytes, resultSize, requestEnd, requestEndMs); + long examinedRows = getExaminedRows(); + StatisticFrontendSqlEntry f = new StatisticFrontendSqlEntry(currentSession.getTraceFrontendInfo(), requestStart, requestStartMs, + schema, sql, sqlType, currentSession.getShardingService().getTxId(), examinedRows, sqlRows, + netOutBytes, resultSize, requestEnd, requestEndMs, new ArrayList(tableList)); StatisticManager.getInstance().push(f); if (isDetailTrace) { SlowQueryLog.getInstance().putSlowQueryLog(currentSession.getShardingService(), this.clone()); @@ -260,10 +294,19 @@ public class TraceResult implements Cloneable { } } + private long getExaminedRows() { + long examinedRows = 0; + for (BackendRoute backendRoute : backendRouteList) { + if (backendRoute.finished != 0) { + examinedRows += backendRoute.getRow().get(); + } + } + return examinedRows; + } + public void setExit() { reset(); previous = null; - frontendInfo = null; } public void setShardingNodes(RouteResultsetNode[] shardingNodes) { @@ -324,7 +367,9 @@ public class TraceResult implements Cloneable { adtCommitBegin = 0; adtCommitEnd = 0; sql = null; + sqlType = -1; schema = null; + tableList.clear(); sqlRows = 0; netOutBytes = 0; resultSize = 0; diff --git a/src/main/java/com/actiontech/dble/statistic/trace/TrackProbe.java b/src/main/java/com/actiontech/dble/statistic/trace/TrackProbe.java index bddb334fb..fdd31cb1e 100644 --- a/src/main/java/com/actiontech/dble/statistic/trace/TrackProbe.java +++ b/src/main/java/com/actiontech/dble/statistic/trace/TrackProbe.java @@ -6,6 +6,7 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.route.RouteResultset; +import com.actiontech.dble.route.parser.util.Pair; import com.actiontech.dble.server.NonBlockingSession; import com.actiontech.dble.server.SessionStage; import com.actiontech.dble.server.status.SlowQueryLog; @@ -15,9 +16,8 @@ import com.actiontech.dble.statistic.stat.QueryTimeCost; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Consumer; public class TrackProbe extends AbstractTrackProbe { public static final Logger LOGGER = LoggerFactory.getLogger(TrackProbe.class); @@ -41,142 +41,183 @@ public class TrackProbe extends AbstractTrackProbe { sessionStage = SessionStage.Read_SQL; long requestTime = System.nanoTime(); isTrace = currentSession.isTraceEnable() || SlowQueryLog.getInstance().isEnableSlowLog() || StatisticManager.getInstance().mainSwitch(); - sqlTracking(t -> t.setRequestTime(requestTime, System.currentTimeMillis())); + if (isTrace) + traceResult.setRequestTime(requestTime, System.currentTimeMillis()); timeCost = (SystemConfig.getInstance().getUseCostTimeStat() != 0) && !(ThreadLocalRandom.current().nextInt(100) >= SystemConfig.getInstance().getCostSamplePercent()); - sqlCosting(c -> c.setRequestTime(requestTime)); + if (timeCost) + queryTimeCost.setRequestTime(requestTime); } public void startProcess() { sessionStage = SessionStage.Parse_SQL; long startProcess = System.nanoTime(); - sqlTracking(t -> t.startProcess(startProcess)); - sqlCosting(c -> c.startProcess()); + if (isTrace) + traceResult.startProcess(startProcess); + if (timeCost) + queryTimeCost.startProcess(); } - public void setQuery(String sql) { - sqlTracking(t -> t.setQuery(sql)); + public void setQuery(String sql, int sqlType) { + if (isTrace) + traceResult.setQuery(sql, sqlType); + } + + public void addTable(List> tables) { + if (isTrace) + traceResult.addTable(tables); } public void endParse() { sessionStage = SessionStage.Route_Calculation; - sqlTracking(t -> t.endParse(System.nanoTime())); - sqlCosting(c -> c.endParse()); + if (isTrace) + traceResult.endParse(System.nanoTime()); + if (timeCost) + queryTimeCost.endParse(); } public void endRoute(RouteResultset rrs) { sessionStage = SessionStage.Prepare_to_Push; - sqlTracking(t -> t.endRoute(System.nanoTime())); - sqlCosting(c -> c.endRoute(rrs)); + if (isTrace) + traceResult.endRoute(System.nanoTime()); + if (timeCost) + queryTimeCost.endRoute(rrs); } public void endComplexRoute() { - sqlCosting(c -> c.endComplexRoute()); + if (timeCost) + queryTimeCost.endComplexRoute(); } public void endComplexExecute() { - sqlCosting(c -> c.endComplexExecute()); + if (timeCost) + queryTimeCost.endComplexExecute(); } public void readyToDeliver() { - sqlCosting(c -> c.readyToDeliver()); + if (timeCost) + queryTimeCost.readyToDeliver(); } public void setPreExecuteEnd(TraceResult.SqlTraceType type) { sessionStage = SessionStage.Execute_SQL; - sqlTracking(t -> t.setPreExecuteEnd(type, System.nanoTime())); + if (isTrace) + traceResult.setPreExecuteEnd(type, System.nanoTime()); } public void setSubQuery() { - sqlTracking(t -> t.setSubQuery()); + if (isTrace) + traceResult.setSubQuery(); } public void setBackendRequestTime(MySQLResponseService service) { long requestTime0 = System.nanoTime(); - sqlTracking(t -> t.setBackendRequestTime(service, requestTime0)); - sqlCosting(c -> c.setBackendRequestTime(service.getConnection().getId(), requestTime0)); + if (isTrace) + traceResult.setBackendRequestTime(service, requestTime0); + if (timeCost) + queryTimeCost.setBackendRequestTime(service.getConnection().getId(), requestTime0); } // receives the response package (before being pushed into BackendService.taskQueue) public void setBackendResponseTime(MySQLResponseService service) { sessionStage = SessionStage.Fetching_Result; long responseTime = System.nanoTime(); - sqlTracking(t -> t.setBackendResponseTime(service, responseTime)); - sqlCosting(c -> c.setBackendResponseTime(service.getConnection().getId(), responseTime)); + if (isTrace) + traceResult.setBackendResponseTime(service, responseTime); + if (timeCost) + queryTimeCost.setBackendResponseTime(service.getConnection().getId(), responseTime); } // start processing the response package (the first package taken out of the BackendService.taskQueue) public void startExecuteBackend() { - sqlCosting(c -> c.startExecuteBackend()); + if (timeCost) + queryTimeCost.startExecuteBackend(); } // When multiple nodes are queried, all nodes return the point in time of the EOF package public void allBackendConnReceive() { - sqlCosting(c -> c.allBackendConnReceive()); + if (timeCost) + queryTimeCost.allBackendConnReceive(); } public void setBackendSqlAddRows(MySQLResponseService service) { - sqlTracking(t -> t.setBackendSqlAddRows(service, null)); + if (isTrace) + traceResult.setBackendSqlAddRows(service, null); } public void setBackendSqlSetRows(MySQLResponseService service, long rows) { - sqlTracking(t -> t.setBackendSqlAddRows(service, rows)); + if (isTrace) + traceResult.setBackendSqlAddRows(service, rows); } // the final response package received,(include connection is accidentally closed or released) public void setBackendResponseEndTime(MySQLResponseService service) { sessionStage = SessionStage.First_Node_Fetched_Result; - sqlTracking(t -> t.setBackendResponseEndTime(service, System.nanoTime())); - sqlCosting(c -> c.setBackendResponseEndTime()); + if (isTrace) + traceResult.setBackendResponseEndTime(service, System.nanoTime()); + if (timeCost) + queryTimeCost.setBackendResponseEndTime(); } public void setBackendTerminateByComplex(MultiNodeMergeHandler mergeHandler) { - sqlTracking(t -> t.setBackendTerminateByComplex(mergeHandler, System.nanoTime())); + if (isTrace) + traceResult.setBackendTerminateByComplex(mergeHandler, System.nanoTime()); } public void setBackendResponseTxEnd(MySQLResponseService service) { - sqlTracking(t -> t.setBackendResponseTxEnd(service, System.nanoTime())); + if (isTrace) + traceResult.setBackendResponseTxEnd(service, System.nanoTime()); } public void setBackendResponseClose(MySQLResponseService service) { - sqlTracking(t -> t.setBackendResponseTxEnd(service, System.nanoTime())); + if (isTrace) + traceResult.setBackendResponseTxEnd(service, System.nanoTime()); } public void setFrontendAddRows() { - sqlTracking(t -> t.setFrontendAddRows()); + if (isTrace) + traceResult.setFrontendAddRows(); } public void setFrontendSetRows(long rows) { - sqlTracking(t -> t.setFrontendSetRows(rows)); + if (isTrace) + traceResult.setFrontendSetRows(rows); } // get the rows、 netOutBytes、resultSize information in the last handler public void doSqlStat(long sqlRows, long netOutBytes, long resultSize) { - sqlTracking(t -> t.setSqlStat(sqlRows, netOutBytes, resultSize)); + if (isTrace) + traceResult.setSqlStat(sqlRows, netOutBytes, resultSize); } public void setResponseTime(boolean isSuccess) { sessionStage = SessionStage.Finished; long responseTime = System.nanoTime(); - sqlTracking(t -> t.setResponseTime(isSuccess, responseTime, System.currentTimeMillis())); - sqlCosting(t -> t.setResponseTime(responseTime)); + if (isTrace) + traceResult.setResponseTime(isSuccess, responseTime, System.currentTimeMillis()); + if (timeCost) + queryTimeCost.setResponseTime(responseTime); } public void setExit() { - sqlTracking(t -> t.setExit()); + if (isTrace) + traceResult.setExit(); } public void setBeginCommitTime() { sessionStage = SessionStage.Distributed_Transaction_Commit; - sqlTracking(t -> t.setAdtCommitBegin(System.nanoTime())); + if (isTrace) + traceResult.setAdtCommitBegin(System.nanoTime()); } public void setFinishedCommitTime() { - sqlTracking(t -> t.setAdtCommitEnd(System.nanoTime())); + if (isTrace) + traceResult.setAdtCommitEnd(System.nanoTime()); } // record the start time of each handler in the complex-query public void setHandlerStart(DMLResponseHandler handler) { - sqlTracking(t -> t.addToRecordStartMap(handler, System.nanoTime())); + if (isTrace) + traceResult.addToRecordStartMap(handler, System.nanoTime()); } // record the end time of each handler in the complex-query @@ -185,38 +226,45 @@ public class TrackProbe extends AbstractTrackProbe { DMLResponseHandler next = handler.getNextHandler(); sessionStage = SessionStage.changeFromHandlerType(next.type()); } - sqlTracking(t -> t.addToRecordEndMap(handler, System.nanoTime())); + if (isTrace) + traceResult.addToRecordEndMap(handler, System.nanoTime()); } public void setTraceBuilder(BaseHandlerBuilder baseBuilder) { - sqlTracking(t -> t.setBuilder(baseBuilder)); + if (isTrace) + traceResult.setBuilder(baseBuilder); } public void setTraceSimpleHandler(ResponseHandler simpleHandler) { - sqlTracking(t -> t.setSimpleHandler(simpleHandler)); + if (isTrace) + traceResult.setSimpleHandler(simpleHandler); } - private void sqlTracking(Consumer consumer) { + /*private void sqlTracking(Consumer consumer) { try { if (isTrace) { - Optional.ofNullable(traceResult).ifPresent(consumer); + if (traceResult != null) { + consumer.accept(traceResult); + } } } catch (Exception e) { // Should not affect the main task LOGGER.warn("sqlTracking occurred ", e); } - } + }*/ - private void sqlCosting(Consumer costConsumer) { + /*private void sqlCosting(Consumer costConsumer) { try { if (timeCost) { - Optional.ofNullable(queryTimeCost).ifPresent(costConsumer); + if (queryTimeCost != null) { + costConsumer.accept(queryTimeCost); + } } } catch (Exception e) { // Should not affect the main task LOGGER.warn("sqlCosting occurred ", e); } - } + }*/ public SessionStage getSessionStage() { return sessionStage; diff --git a/src/main/java/com/actiontech/dble/util/SqlStringUtil.java b/src/main/java/com/actiontech/dble/util/SqlStringUtil.java index ce58d00db..3be62a343 100644 --- a/src/main/java/com/actiontech/dble/util/SqlStringUtil.java +++ b/src/main/java/com/actiontech/dble/util/SqlStringUtil.java @@ -36,6 +36,10 @@ public final class SqlStringUtil { String type; switch (sqlType) { case ServerParse.DDL: + case ServerParse.CREATE_DATABASE: + case ServerParse.CREATE_VIEW: + case ServerParse.DROP_VIEW: + case ServerParse.DROP_TABLE: type = "DDL"; break; case ServerParse.INSERT: