[inner-1803] optimize slow log and trace (#3310)

This commit is contained in:
wenyh
2022-07-26 16:59:53 +08:00
committed by GitHub
parent 9b5387bd13
commit 39169c8a85
30 changed files with 571 additions and 548 deletions

View File

@@ -7,6 +7,7 @@ package com.actiontech.dble.backend.mysql;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.net.mysql.BinaryPacket;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
@@ -58,7 +59,7 @@ public final class LoadDataUtil {
//send empty packet
byte[] empty = new byte[]{0, 0, 0, 3};
empty[3] = ++packId;
service.write(empty, WriteFlags.QUERY_END);
service.write(empty, WriteFlags.QUERY_END, ResultFlag.OTHER);
}
}

View File

@@ -175,7 +175,6 @@ public class LockTablesHandler extends MultiNodeHandler implements ExecutableHan
}
private void handleEndPacket(MySQLPacket packet, boolean isSuccess) {
session.setResponseTime(isSuccess);
session.clearResources(false);
packet.write(session.getSource());
}

View File

@@ -704,7 +704,6 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
if (inTransaction && (AutoTxOperation.ROLLBACK == txOperation)) {
service.setTxInterrupt("ROLLBACK");
}
session.setResponseTime(isSuccess);
curPacket.write(session.getSource());
}

View File

@@ -16,6 +16,7 @@ import com.actiontech.dble.log.transaction.TxnLogHelper;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.RouteResultsetNode;
@@ -428,7 +429,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
if (requestScope.isUsingCursor()) {
recycle();
requestScope.getCurrentPreparedStatement().getCursorCache().done();
session.getShardingService().writeDirectly(byteBuffer, WriteFlags.QUERY_END);
session.getShardingService().writeDirectly(byteBuffer, WriteFlags.QUERY_END, ResultFlag.EOF_ROW);
return;
}
this.resultSize += eof.length;
@@ -438,7 +439,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
if (this.isFail()) {
session.setResponseTime(false);
session.resetMultiStatementStatus();
if (session.closed()) {
cleanBuffer();
@@ -468,7 +468,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
session.releaseConnections(false);
}
}
session.setResponseTime(!this.isFail());
}
@Override
@@ -566,7 +565,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
cleanBuffer();
} else {
ErrorPacket errorPacket = createErrPkg(this.error, err.getErrNo());
session.getShardingService().writeDirectly(byteBuffer, WriteFlags.QUERY_END);
session.getShardingService().writeDirectly(byteBuffer, WriteFlags.QUERY_END, ResultFlag.ERROR);
handleEndPacket(errorPacket, AutoTxOperation.ROLLBACK, false);
}
}
@@ -585,7 +584,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("last packet id:" + (byte) session.getShardingService().getPacketId().get());
}
session.setResponseTime(true);
QueryResultDispatcher.doSqlStat(rrs, session, selectRows, netOutBytes, resultSize);
eofRowPacket.write(byteBuffer, source);
}
@@ -707,7 +705,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
if (inTransaction && (AutoTxOperation.ROLLBACK == txOperation)) {
service.setTxInterrupt("ROLLBACK");
}
session.setResponseTime(isSuccess);
packet.write(session.getSource());
}

View File

@@ -14,6 +14,7 @@ import com.actiontech.dble.log.transaction.TxnLogHelper;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.RouteResultsetNode;
@@ -216,7 +217,6 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
}
if (buffer != null) {
/* SELECT 9223372036854775807 + 1; response: field_count, field, eof, err */
session.setResponseTime(false);
errPkg.write(buffer, shardingService);
} else {
errPkg.write(shardingService.getConnection());
@@ -259,7 +259,6 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
shardingService.setLastInsertId(ok.getInsertId());
session.setBackendResponseEndTime((MySQLResponseService) service);
session.releaseConnectionIfSafe((MySQLResponseService) service, false);
session.setResponseTime(true);
session.multiStatementPacket(ok);
QueryResultDispatcher.doSqlStat(rrs, session, selectRows, netOutBytes, resultSize);
if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
@@ -298,11 +297,10 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
eofRowPacket.read(eof);
ShardingService shardingService = session.getShardingService();
session.setResponseTime(true);
QueryResultDispatcher.doSqlStat(rrs, session, selectRows, netOutBytes, resultSize);
if (requestScope.isUsingCursor()) {
requestScope.getCurrentPreparedStatement().getCursorCache().done();
session.getShardingService().writeDirectly(buffer, WriteFlags.QUERY_END);
session.getShardingService().writeDirectly(buffer, WriteFlags.QUERY_END, ResultFlag.EOF_ROW);
}
lock.lock();
try {

View File

@@ -357,7 +357,6 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl
protected void handleEndPacket(MySQLPacket packet) {
session.clearResources(false);
this.clearResources();
session.setResponseTime(packet instanceof OkPacket);
DDLTraceHelper.finish(session.getShardingService());
packet.write(session.getSource());
}

View File

@@ -12,6 +12,7 @@ import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.net.Session;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.RequestScope;
@@ -185,8 +186,7 @@ public class OutputHandler extends BaseDMLHandler {
requestScope.getCurrentPreparedStatement().getCursorCache().done();
HandlerTool.terminateHandlerTree(this);
serverSession.setHandlerEnd(this);
serverSession.setResponseTime(true);
serverSession.getShardingService().writeDirectly(buffer, WriteFlags.QUERY_END);
serverSession.getShardingService().writeDirectly(buffer, WriteFlags.QUERY_END, ResultFlag.EOF_ROW);
return;
}
lock.lock();
@@ -203,7 +203,6 @@ public class OutputHandler extends BaseDMLHandler {
doSqlStat();
HandlerTool.terminateHandlerTree(this);
serverSession.setHandlerEnd(this);
serverSession.setResponseTime(true);
eofPacket.write(buffer, shardingService);
} finally {
lock.unlock();

View File

@@ -41,7 +41,6 @@ public class OutputHandlerForPrepare extends OutputHandler {
requestScope.getCurrentPreparedStatement().onPrepareOk(fieldPackets.size());
HandlerTool.terminateHandlerTree(this);
serverSession.setHandlerEnd(this);
serverSession.setResponseTime(true);
return;
}

View File

@@ -11,6 +11,7 @@ import com.actiontech.dble.net.Session;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.services.manager.ManagerSession;
import org.jetbrains.annotations.NotNull;
@@ -56,7 +57,7 @@ public class ManagerOutputHandler extends BaseDMLHandler {
lock.lock();
try {
buffer = managerSession.getSource().getService().writeToBuffer(err, buffer);
managerSession.getSource().getService().writeDirectly(buffer, WriteFlags.SESSION_END);
managerSession.getSource().getService().writeDirectly(buffer, WriteFlags.SESSION_END, ResultFlag.ERROR);
} finally {
lock.unlock();
}
@@ -137,7 +138,7 @@ public class ManagerOutputHandler extends BaseDMLHandler {
byte[] eof = eofPacket.toBytes();
buffer = source.getService().writeToBuffer(eof, buffer);
managerSession.setHandlerEnd(this);
source.getService().writeDirectly(buffer, WriteFlags.QUERY_END);
source.getService().writeDirectly(buffer, WriteFlags.QUERY_END, ResultFlag.EOF_ROW);
} finally {
lock.unlock();
}

View File

@@ -71,7 +71,6 @@ public class CommitStage extends Stage implements TransactionStage {
private void asyncNext(boolean isFail, String errMsg, MySQLPacket sendData) {
if (isFail) {
session.setFinishedCommitTime();
session.setResponseTime(false);
if (sendData != null) {
sendData.write(session.getSource());
} else {
@@ -82,7 +81,6 @@ public class CommitStage extends Stage implements TransactionStage {
handler.next();
} else {
session.setFinishedCommitTime();
session.setResponseTime(true);
if (sendData != null) {
session.getShardingService().write(sendData);
} else {

View File

@@ -41,7 +41,6 @@ public class RollbackStage implements TransactionStage {
return null;
}
session.setResponseTime(false);
LOGGER.info("GET INTO THE NET LEVEL AND THE RESULT IS " + isFail);
if (isFail) {

View File

@@ -70,7 +70,6 @@ public abstract class XAStage implements TransactionStage {
return;
}
}
session.setResponseTime(isSuccess);
MySQLPacket sendData = xaHandler.getPacketIfSuccess();
if (sendData != null) {
sendData.write(session.getSource());

View File

@@ -118,16 +118,14 @@ public class SlowQueryLogProcessor extends Thread {
}
public void putSlowQueryLog(ShardingService service, TraceResult log) {
if (log.isCompleted() && log.getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) {
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
try {
final boolean enQueue = queue.offer(logEntry, 3, TimeUnit.SECONDS);
if (!enQueue) {
LOGGER.warn("slow log queue has so many item. Discard log entry: {} ", logEntry.toString());
}
} catch (InterruptedException e) {
LOGGER.info(" ", e);
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
try {
final boolean enQueue = queue.offer(logEntry, 3, TimeUnit.SECONDS);
if (!enQueue) {
LOGGER.warn("slow log queue has so many item. Discard log entry: {} ", logEntry.toString());
}
} catch (InterruptedException e) {
LOGGER.info(" ", e);
}
}
}

View File

@@ -8,6 +8,7 @@ package com.actiontech.dble.net.connection;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlag;
import javax.annotation.Nonnull;
@@ -21,16 +22,20 @@ import java.util.EnumSet;
public interface WriteAbleService {
AbstractConnection getConnection();
default void writeDirectly(ByteBuffer buffer, @Nonnull EnumSet<WriteFlag> writeFlags) {
writeDirectly(buffer, writeFlags, ResultFlag.OTHER);
}
/**
* the common method to write to connection.
*
* @param buffer
* @param writeFlags
*/
default void writeDirectly(ByteBuffer buffer, @Nonnull EnumSet<WriteFlag> writeFlags) {
default void writeDirectly(ByteBuffer buffer, @Nonnull EnumSet<WriteFlag> writeFlags, ResultFlag resultFlag) {
final boolean end = writeFlags.contains(WriteFlag.END_OF_QUERY) || writeFlags.contains(WriteFlag.END_OF_SESSION);
if (end) {
beforeWriteFinish(writeFlags);
beforeWriteFinish(writeFlags, resultFlag);
}
getConnection().innerWrite(buffer, writeFlags);
@@ -39,10 +44,13 @@ public interface WriteAbleService {
}
}
void beforeWriteFinish(@Nonnull EnumSet<WriteFlag> writeFlags);
void beforeWriteFinish(@Nonnull EnumSet<WriteFlag> writeFlags, ResultFlag resultFlag);
void afterWriteFinish(@Nonnull EnumSet<WriteFlag> writeFlags);
default void write(byte[] data, @Nonnull EnumSet<WriteFlag> writeFlags) {
write(data, writeFlags, ResultFlag.OTHER);
}
/**
* NOTICE: this method is not a good practice,may deprecated in the future
@@ -54,10 +62,10 @@ public interface WriteAbleService {
* @param data
* @param writeFlags
*/
default void write(byte[] data, @Nonnull EnumSet<WriteFlag> writeFlags) {
default void write(byte[] data, @Nonnull EnumSet<WriteFlag> writeFlags, ResultFlag resultFlag) {
ByteBuffer buffer = getConnection().allocate();
ByteBuffer writeBuffer = writeToBuffer(data, buffer);
this.writeDirectly(writeBuffer, writeFlags);
this.writeDirectly(writeBuffer, writeFlags, resultFlag);
}
/**
@@ -71,7 +79,7 @@ public interface WriteAbleService {
}
default void writeWithBuffer(MySQLPacket packet, ByteBuffer buffer) {
this.writeDirectly(writeToBuffer(packet, buffer), packet.getLastWriteFlag());
this.writeDirectly(writeToBuffer(packet, buffer), packet.getLastWriteFlag(), packet.getResultFlag());
}
default ByteBuffer checkWriteBuffer(ByteBuffer buffer, int capacity, boolean writeSocketIfFull) {

View File

@@ -6,6 +6,7 @@
package com.actiontech.dble.net.mysql;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import java.nio.ByteBuffer;
@@ -23,4 +24,9 @@ public class EOFRowPacket extends EOFPacket {
public boolean isEndOfQuery() {
return true;
}
@Override
public ResultFlag getResultFlag() {
return ResultFlag.EOF_ROW;
}
}

View File

@@ -9,6 +9,7 @@ import com.actiontech.dble.backend.mysql.BufferUtil;
import com.actiontech.dble.backend.mysql.MySQLMessage;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.singleton.BufferPoolManager;
import java.nio.ByteBuffer;
@@ -112,7 +113,7 @@ public class ErrorPacket extends MySQLPacket {
public void bufferWrite(AbstractConnection c) {
ByteBuffer buffer = c.allocate();
buffer = this.write(buffer, c.getService(), true);
c.getService().writeDirectly(buffer, getLastWriteFlag());
c.getService().writeDirectly(buffer, getLastWriteFlag(), getResultFlag());
}
@Override
@@ -178,4 +179,9 @@ public class ErrorPacket extends MySQLPacket {
public boolean isEndOfSession() {
return true;
}
@Override
public ResultFlag getResultFlag() {
return ResultFlag.ERROR;
}
}

View File

@@ -7,6 +7,7 @@ package com.actiontech.dble.net.mysql;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
@@ -209,7 +210,7 @@ public abstract class MySQLPacket {
public void bufferWrite(AbstractConnection connection) {
ByteBuffer buffer = connection.allocate();
buffer = this.write(buffer, connection.getService(), true);
connection.getService().writeDirectly(buffer, getLastWriteFlag());
connection.getService().writeDirectly(buffer, getLastWriteFlag(), getResultFlag());
}
@@ -223,6 +224,10 @@ public abstract class MySQLPacket {
}
}
public ResultFlag getResultFlag() {
return ResultFlag.OTHER;
}
/**
* calcPacketSize,not contains header size
*/

View File

@@ -9,6 +9,7 @@ import com.actiontech.dble.backend.mysql.BufferUtil;
import com.actiontech.dble.backend.mysql.MySQLMessage;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.singleton.BufferPoolManager;
import java.nio.ByteBuffer;
@@ -107,7 +108,7 @@ public class OkPacket extends MySQLPacket {
@Override
public void bufferWrite(AbstractConnection c) {
ByteBuffer buffer = write(c.allocate(), c);
c.getService().writeDirectly(buffer, getLastWriteFlag());
c.getService().writeDirectly(buffer, getLastWriteFlag(), getResultFlag());
}
@@ -197,4 +198,9 @@ public class OkPacket extends MySQLPacket {
public boolean isEndOfQuery() {
return true;
}
@Override
public ResultFlag getResultFlag() {
return ResultFlag.OK;
}
}

View File

@@ -52,7 +52,7 @@ public abstract class AbstractService extends VariablesService implements Servic
}
@Override
public void beforeWriteFinish(@Nonnull EnumSet<WriteFlag> writeFlags) {
public void beforeWriteFinish(@Nonnull EnumSet<WriteFlag> writeFlags, ResultFlag resultFlag) {
if (writeFlags.contains(WriteFlag.END_OF_QUERY)) {
TraceManager.sessionFinish(this);
} else if (writeFlags.contains(WriteFlag.END_OF_SESSION)) {

View File

@@ -0,0 +1,11 @@
package com.actiontech.dble.net.service;
/**
* response to the results of front connection
*/
public enum ResultFlag {
OK,
EOF_ROW,
ERROR,
OTHER // Default
}

View File

@@ -41,7 +41,6 @@ import com.actiontech.dble.route.parser.util.ParseUtil;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.status.LoadDataBatch;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.server.trace.TraceRecord;
import com.actiontech.dble.server.trace.TraceResult;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
@@ -70,6 +69,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import static com.actiontech.dble.meta.PauseEndThreadPool.CONTINUE_TYPE_MULTIPLE;
import static com.actiontech.dble.meta.PauseEndThreadPool.CONTINUE_TYPE_SINGLE;
@@ -137,15 +137,21 @@ public class NonBlockingSession extends Session {
this.outputHandler = outputHandler;
}
private void sqlTracking(Consumer<TraceResult> consumer) {
try {
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
Optional.ofNullable(traceResult).ifPresent(consumer);
}
} catch (Exception e) {
// Should not affect the main task
LOGGER.warn("sqlTracking occurred: {}", e);
}
}
public void setRequestTime() {
sessionStage = SessionStage.Read_SQL;
sqlTracking(t -> t.setRequestTime());
long requestTime = 0;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
requestTime = System.nanoTime();
traceResult.setVeryStartPrepare(requestTime);
}
if (SystemConfig.getInstance().getUseCostTimeStat() == 0) {
return;
}
@@ -161,9 +167,8 @@ public class NonBlockingSession extends Session {
provider = new CostTimeProvider();
xprovider = new ComplexQueryProvider();
provider.beginRequest(shardingService.getConnection().getId());
if (requestTime == 0) {
requestTime = System.nanoTime();
}
long requestTime = System.nanoTime();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("frontend connection setRequestTime:" + requestTime);
}
@@ -172,9 +177,8 @@ public class NonBlockingSession extends Session {
public void startProcess() {
sessionStage = SessionStage.Parse_SQL;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setParseStartPrepare(new TraceRecord(System.nanoTime()));
}
sqlTracking(t -> t.startProcess());
if (!timeCost) {
return;
}
@@ -183,22 +187,18 @@ public class NonBlockingSession extends Session {
public void endParse() {
sessionStage = SessionStage.Route_Calculation;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.ready();
traceResult.setRouteStart(new TraceRecord(System.nanoTime()));
}
sqlTracking(t -> t.endParse());
if (!timeCost) {
return;
}
provider.endParse(shardingService.getConnection().getId());
}
public void endRoute(RouteResultset rrs) {
sessionStage = SessionStage.Prepare_to_Push;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setPreExecuteStart(new TraceRecord(System.nanoTime()));
}
sqlTracking(t -> t.endRoute());
if (!timeCost) {
return;
}
@@ -229,12 +229,7 @@ public class NonBlockingSession extends Session {
public void setPreExecuteEnd(TraceResult.SqlTraceType type) {
sessionStage = SessionStage.Execute_SQL;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setType(type);
traceResult.setPreExecuteEnd(new TraceRecord(System.nanoTime()));
traceResult.clearConnReceivedMap();
traceResult.clearConnFlagMap();
}
sqlTracking(t -> t.setPreExecuteEnd(type));
}
public long getRowCount() {
@@ -242,9 +237,7 @@ public class NonBlockingSession extends Session {
}
public void setSubQuery() {
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setSubQuery(true);
}
sqlTracking(t -> t.setSubQuery());
}
public void setBackendRequestTime(MySQLResponseService service) {
@@ -260,33 +253,18 @@ public class NonBlockingSession extends Session {
}
backendCost.setRequestTime(requestTime);
queryTimeCost.getBackEndTimeCosts().put(backendID, backendCost);
}
public void setBackendResponseTime(MySQLResponseService service) {
sessionStage = SessionStage.Fetching_Result;
// Optional.ofNullable(StatisticListener2.getInstance().getRecorder(this, r ->r.onBackendSqlFirstEnd(service));
long responseTime = 0;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
String key = service.getConnection().getId() + ":" + node.getName() + ":" + +node.getStatementHash();
if (traceResult.addToConnFlagMap(key) == null) {
ResponseHandler responseHandler = service.getResponseHandler();
responseTime = System.nanoTime();
TraceRecord record = new TraceRecord(responseTime, node.getName(), node.getStatement());
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
connMap.put(key, record);
traceResult.addToConnReceivedMap(responseHandler, connMap);
}
}
long responseTime = System.nanoTime();
sqlTracking(t -> t.setBackendResponseTime(service, responseTime));
if (!timeCost) {
return;
}
QueryTimeCost backCost = queryTimeCost.getBackEndTimeCosts().get(service.getConnection().getId());
if (responseTime == 0) {
responseTime = System.nanoTime();
}
if (backCost != null && backCost.getResponseTime().compareAndSet(0, responseTime)) {
if (queryTimeCost.getFirstBackConRes().compareAndSet(false, true)) {
if (LOGGER.isDebugEnabled()) {
@@ -324,20 +302,13 @@ public class NonBlockingSession extends Session {
public void setResponseTime(boolean isSuccess) {
sessionStage = SessionStage.Finished;
long responseTime = 0;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
responseTime = System.nanoTime();
traceResult.setVeryEnd(responseTime);
if (isSuccess) {
SlowQueryLog.getInstance().putSlowQueryLog(this.shardingService, (TraceResult) traceResult.clone());
}
}
sqlTracking(t -> t.setResponseTime(shardingService, isSuccess));
if (!timeCost) {
return;
}
if (responseTime == 0) {
responseTime = System.nanoTime();
}
long responseTime = System.nanoTime();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("setResponseTime:" + responseTime);
}
@@ -346,22 +317,10 @@ public class NonBlockingSession extends Session {
QueryTimeCostContainer.getInstance().add(queryTimeCost);
}
public void setStageFinished() {
sessionStage = SessionStage.Finished;
}
public void setBackendResponseEndTime(MySQLResponseService service) {
sessionStage = SessionStage.First_Node_Fetched_Result;
StatisticListener.getInstance().record(this, r -> r.onBackendSqlEnd(service));
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
ResponseHandler responseHandler = service.getResponseHandler();
TraceRecord record = new TraceRecord(System.nanoTime(), node.getName(), node.getStatement());
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
String key = service.getConnection().getId() + ":" + node.getName() + ":" + +node.getStatementHash();
connMap.put(key, record);
traceResult.addToConnFinishedMap(responseHandler, connMap);
}
sqlTracking(t -> t.setBackendResponseEndTime(service));
if (!timeCost) {
return;
@@ -373,21 +332,15 @@ public class NonBlockingSession extends Session {
public void setBeginCommitTime() {
sessionStage = SessionStage.Distributed_Transaction_Commit;
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setAdtCommitBegin(new TraceRecord(System.nanoTime()));
}
sqlTracking(t -> t.setAdtCommitBegin());
}
public void setFinishedCommitTime() {
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setAdtCommitEnd(new TraceRecord(System.nanoTime()));
}
sqlTracking(t -> t.setAdtCommitEnd());
}
public void setHandlerStart(DMLResponseHandler handler) {
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.addToRecordStartMap(handler, new TraceRecord(System.nanoTime()));
}
sqlTracking(t -> t.addToRecordStartMap(handler));
}
public void setHandlerEnd(DMLResponseHandler handler) {
@@ -395,14 +348,12 @@ public class NonBlockingSession extends Session {
DMLResponseHandler next = handler.getNextHandler();
sessionStage = SessionStage.changeFromHandlerType(next.type());
}
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.addToRecordEndMap(handler, new TraceRecord(System.nanoTime()));
}
sqlTracking(t -> t.addToRecordEndMap(handler));
}
public List<String[]> genTraceResult() {
if (traceEnable) {
return traceResult.genTraceResult();
return traceResult.genShowTraceResult();
} else {
return null;
}
@@ -410,7 +361,7 @@ public class NonBlockingSession extends Session {
public List<String[]> genRunningSQLStage() {
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
TraceResult tmpResult = (TraceResult) traceResult.clone();
TraceResult tmpResult = traceResult.clone();
return tmpResult.genRunningSQLStage();
} else {
return null;
@@ -544,7 +495,6 @@ public class NonBlockingSession extends Session {
}
executableHandler.clearAfterFailExecute();
}
setResponseTime(false);
DDLTraceHelper.finish(shardingService);
shardingService.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.getMessage());
} finally {
@@ -581,7 +531,6 @@ public class NonBlockingSession extends Session {
LOGGER.info(String.valueOf(shardingService) + rrs, e);
executableHandler.writeRemainBuffer();
executableHandler.clearAfterFailExecute();
setResponseTime(false);
shardingService.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
} finally {
@@ -593,9 +542,7 @@ public class NonBlockingSession extends Session {
}
public void setTraceBuilder(BaseHandlerBuilder baseBuilder) {
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setBuilder(baseBuilder);
}
sqlTracking(t -> t.setBuilder(baseBuilder));
}
private void executeMultiResultSet(RouteResultset rrs, PlanNode node) {
@@ -1064,9 +1011,7 @@ public class NonBlockingSession extends Session {
}
public void setTraceSimpleHandler(ResponseHandler simpleHandler) {
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setSimpleHandler(simpleHandler);
}
sqlTracking(t -> t.setSimpleHandler(simpleHandler));
}
public RouteResultset getComplexRrs() {

View File

@@ -24,6 +24,7 @@ import com.actiontech.dble.net.handler.LoadDataInfileHandler;
import com.actiontech.dble.net.mysql.BinaryPacket;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.net.mysql.RequestFilePacket;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.route.LoadDataRouteResultsetNode;
import com.actiontech.dble.route.RouteResultset;
@@ -216,7 +217,7 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler
filePacket.setFileName(fileName.getBytes());
filePacket.setPacketId(1);
buffer = filePacket.write(buffer, service, true);
service.writeDirectly(buffer, WriteFlags.QUERY_END);
service.writeDirectly(buffer, WriteFlags.QUERY_END, ResultFlag.OTHER);
} else {
if (!new File(fileName).exists()) {
String msg = fileName + " is not found!";

View File

@@ -14,6 +14,7 @@ import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.general.GeneralLogHelper;
import com.actiontech.dble.net.handler.FrontendPrepareHandler;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.server.RequestScope;
import com.actiontech.dble.server.parser.PrepareChangeVisitor;
@@ -258,7 +259,7 @@ public class ServerPrepareHandler implements FrontendPrepareHandler {
ok.setStatus(statusFlag);
ok.setWarningCount(0);
ok.write(buffer, service, true);
service.writeDirectly(buffer, WriteFlags.QUERY_END);
service.writeDirectly(buffer, WriteFlags.QUERY_END, ResultFlag.OTHER);
} finally {

View File

@@ -9,6 +9,7 @@ import com.actiontech.dble.backend.mysql.PreparedStatement;
import com.actiontech.dble.net.mysql.EOFPacket;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.PreparedOkPacket;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
@@ -59,7 +60,7 @@ public final class PreparedStmtResponse {
}
// send buffer
service.writeDirectly(buffer, WriteFlags.QUERY_END);
service.writeDirectly(buffer, WriteFlags.QUERY_END, ResultFlag.OTHER);
}
}

View File

@@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory;
public class TraceRecord implements Cloneable {
private static final Logger LOGGER = LoggerFactory.getLogger(TraceRecord.class);
private final String shardingNode;
private final String ref;
private long timestamp;
@@ -18,11 +19,16 @@ public class TraceRecord implements Cloneable {
this(timestamp, "-", "-");
}
public static TraceRecord currenTime() {
return new TraceRecord(System.nanoTime());
}
public TraceRecord(long timestamp, String shardingNode, String ref) {
this.timestamp = timestamp;
this.shardingNode = shardingNode;
this.ref = ref;
}
public String getShardingNode() {
return shardingNode;
}

View File

@@ -13,6 +13,9 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
import com.actiontech.dble.plan.util.ComplexQueryPlanUtil;
import com.actiontech.dble.plan.util.ReferenceHandlerInfo;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,16 +27,11 @@ public class TraceResult implements Cloneable {
public enum SqlTraceType {
SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY;
SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY, SIMPLE_QUERY
}
private static final Logger LOGGER = LoggerFactory.getLogger(TraceResult.class);
private boolean prepareFinished = false;
private long veryStartPrepare;
private long veryStart;
private TraceRecord requestStartPrepare;
private TraceRecord requestStart;
private TraceRecord parseStartPrepare; //requestEnd
private TraceRecord parseStart; //requestEnd
private TraceRecord routeStart; //parseEnd
private TraceRecord preExecuteStart; //routeEnd
@@ -50,34 +48,74 @@ public class TraceResult implements Cloneable {
private ConcurrentMap<ResponseHandler, Map<String, TraceRecord>> connFinishedMap = new ConcurrentHashMap<>();
private ConcurrentMap<DMLResponseHandler, TraceRecord> recordStartMap = new ConcurrentHashMap<>();
private ConcurrentMap<DMLResponseHandler, TraceRecord> recordEndMap = new ConcurrentHashMap<>();
private long veryEnd;
private SqlTraceType type;
private TraceRecord requestEnd;
private boolean subQuery = false;
private TraceResult previous = null;
public void setVeryStartPrepare(long veryStartPrepare) {
prepareFinished = false;
this.veryStartPrepare = veryStartPrepare;
this.requestStartPrepare = new TraceRecord(veryStartPrepare);
public void setRequestTime() {
copyToPrevious();
reset();
this.requestStart = TraceRecord.currenTime();
}
public void setRouteStart(TraceRecord routeStart) {
this.routeStart = routeStart;
public void startProcess() {
this.parseStart = TraceRecord.currenTime();
}
public void setParseStartPrepare(TraceRecord parseStartPrepare) {
this.parseStartPrepare = parseStartPrepare;
public void endParse() {
this.routeStart = TraceRecord.currenTime();
}
public void setPreExecuteStart(TraceRecord preExecuteStart) {
this.preExecuteStart = preExecuteStart;
public void endRoute() {
this.preExecuteStart = TraceRecord.currenTime();
}
public void setPreExecuteEnd(TraceRecord preExecuteEnd) {
this.preExecuteEnd = preExecuteEnd;
public void setPreExecuteEnd(SqlTraceType type0) {
this.type = type0;
this.preExecuteEnd = TraceRecord.currenTime();
clearConnReceivedMap();
clearConnFlagMap();
}
public RouteResultsetNode[] getShardingNodes() {
return shardingNodes;
public void setSubQuery() {
this.subQuery = true;
}
public void setBackendResponseTime(MySQLResponseService service, long responseTime) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
String key = service.getConnection().getId() + ":" + node.getName() + ":" + +node.getStatementHash();
ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null && addToConnFlagMap(key) == null) {
TraceRecord record = new TraceRecord(responseTime, node.getName(), node.getStatement());
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
connMap.put(key, record);
addToConnReceivedMap(responseHandler, connMap);
}
}
public void setBackendResponseEndTime(MySQLResponseService service) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null) {
TraceRecord record = new TraceRecord(System.nanoTime(), node.getName(), node.getStatement());
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
String key = service.getConnection().getId() + ":" + node.getName() + ":" + +node.getStatementHash();
connMap.put(key, record);
addToConnFinishedMap(responseHandler, connMap);
}
}
public void setResponseTime(final ShardingService shardingService, boolean isSuccess) {
if (this.requestEnd == null) {
this.requestEnd = TraceRecord.currenTime();
if (this.isCompletedV1() &&
isSuccess && getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) {
SlowQueryLog.getInstance().putSlowQueryLog(shardingService, this.clone());
}
}
}
public void setShardingNodes(RouteResultsetNode[] shardingNodes) {
@@ -102,38 +140,30 @@ public class TraceResult implements Cloneable {
this.builder = builder;
}
public void setAdtCommitBegin(TraceRecord adtCommitBegin) {
this.adtCommitBegin = adtCommitBegin;
public void setAdtCommitBegin() {
this.adtCommitBegin = TraceRecord.currenTime();
}
public void setAdtCommitEnd(TraceRecord adtCommitEnd) {
this.adtCommitEnd = adtCommitEnd;
public void setAdtCommitEnd() {
this.adtCommitEnd = TraceRecord.currenTime();
}
public void setType(SqlTraceType type) {
this.type = type;
}
public void setSubQuery(boolean subQuery) {
this.subQuery = subQuery;
}
public Boolean addToConnFlagMap(String item) {
private Boolean addToConnFlagMap(String item) {
return connFlagMap.putIfAbsent(item, true);
}
public void clearConnFlagMap() {
private void clearConnFlagMap() {
connFlagMap.clear();
}
public void addToConnReceivedMap(ResponseHandler responseHandler, Map<String, TraceRecord> connMap) {
private void addToConnReceivedMap(ResponseHandler responseHandler, Map<String, TraceRecord> connMap) {
Map<String, TraceRecord> existReceivedMap = connReceivedMap.putIfAbsent(responseHandler, connMap);
if (existReceivedMap != null) {
existReceivedMap.putAll(connMap);
}
}
public void clearConnReceivedMap() {
private void clearConnReceivedMap() {
connReceivedMap.clear();
}
@@ -144,32 +174,17 @@ public class TraceResult implements Cloneable {
}
}
public void addToRecordStartMap(DMLResponseHandler handler, TraceRecord traceRecord) {
recordStartMap.putIfAbsent(handler, traceRecord);
public void addToRecordStartMap(DMLResponseHandler handler) {
recordStartMap.putIfAbsent(handler, TraceRecord.currenTime());
}
public void addToRecordEndMap(DMLResponseHandler handler, TraceRecord traceRecord) {
recordEndMap.putIfAbsent(handler, traceRecord);
public void addToRecordEndMap(DMLResponseHandler handler) {
recordEndMap.putIfAbsent(handler, TraceRecord.currenTime());
}
public void setVeryEnd(long veryEnd) {
this.veryEnd = veryEnd;
}
public void ready() {
prepareFinished = true;
clear();
veryStart = veryStartPrepare;
requestStart = requestStartPrepare;
parseStart = parseStartPrepare;
veryStartPrepare = 0;
requestStartPrepare = null;
parseStartPrepare = null;
}
private void clear() {
veryStart = 0;
private void reset() {
requestStart = null;
requestEnd = null;
parseStart = null;
routeStart = null;
preExecuteStart = null;
@@ -177,7 +192,7 @@ public class TraceResult implements Cloneable {
shardingNodes = null;
adtCommitBegin = null;
adtCommitEnd = null;
this.type = null;
type = null;
subQuery = false;
simpleHandler = null;
builder = null; //for complex query
@@ -192,274 +207,51 @@ public class TraceResult implements Cloneable {
connFinishedMap.clear();
recordStartMap.clear();
recordEndMap.clear();
veryEnd = 0;
}
private void copyToPrevious() {
this.previous = this.clone();
if (!previous.isCompletedV1()) {
previous = null;
}
}
// show @@connection.sql.status where FRONT_ID=?
public List<String[]> genRunningSQLStage() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("start genRunningSQLStage");
}
List<String[]> lst = new ArrayList<>();
if (!prepareFinished) {
if (requestStartPrepare == null) {
if (requestStart != null) {
if (genTraceRecord(lst, "Read_SQL", requestStart, parseStart))
return lst;
if (genTraceRecord(lst, "Parse_SQL", parseStart, routeStart, requestEnd))
return lst;
if (genTraceRecord(lst, "Route_Calculation", routeStart, preExecuteStart))
return lst;
if (genTraceRecord(lst, "Prepare_to_Push/Optimize", preExecuteStart, preExecuteEnd))
return lst;
if (simpleHandler != null) {
genRunningSimpleResults(lst);
return lst;
} else if (builder != null) {
genRunningComplexQueryResults(lst);
return lst;
} else if (subQuery) {
lst.add(genTraceRecord("Doing_SubQuery", preExecuteEnd.getTimestamp()));
return lst;
} else if (shardingNodes == null || (this.type == SqlTraceType.COMPLEX_QUERY)) {
lst.add(genTraceRecord("Generate_Query_Explain", preExecuteEnd.getTimestamp()));
return lst;
} else {
if (parseStartPrepare == null) {
lst.add(genTraceRecord("Read_SQL", requestStartPrepare.getTimestamp()));
return lst;
} else {
lst.add(genTraceRecord("Read_SQL", requestStartPrepare.getTimestamp(), parseStartPrepare.getTimestamp()));
lst.add(genTraceRecord("Parse_SQL", parseStartPrepare.getTimestamp()));
return lst;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("not support trace this query or unfinished");
}
}
}
lst.add(genTraceRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp()));
if (routeStart == null) {
lst.add(genTraceRecord("Parse_SQL", parseStart.getTimestamp()));
return lst;
} else {
lst.add(genTraceRecord("Parse_SQL", parseStart.getTimestamp(), routeStart.getTimestamp()));
}
if (preExecuteStart == null) {
lst.add(genTraceRecord("Route_Calculation", routeStart.getTimestamp()));
return lst;
} else {
lst.add(genTraceRecord("Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp()));
}
if (preExecuteEnd == null) {
lst.add(genTraceRecord("Prepare_to_Push/Optimize", preExecuteStart.getTimestamp()));
return lst;
} else {
lst.add(genTraceRecord("Prepare_to_Push/Optimize", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp()));
}
if (simpleHandler != null) {
genRunningSimpleResults(lst);
return lst;
} else if (builder != null) {
genRunningComplexQueryResults(lst);
return lst;
} else if (subQuery) {
lst.add(genTraceRecord("Doing_SubQuery", preExecuteEnd.getTimestamp()));
return lst;
} else if (shardingNodes == null || (this.type == SqlTraceType.COMPLEX_QUERY)) {
lst.add(genTraceRecord("Generate_Query_Explain", preExecuteEnd.getTimestamp()));
return lst;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("not support trace this query or unfinished");
}
return lst;
}
}
public List<String[]> genTraceResult() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("start genTraceResult");
}
if (!isCompleted()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs,veryEnd:" + veryEnd + ",connFlagMap.size:" + connFlagMap.size() +
",connReceivedMap.size:" + connReceivedMap.size() + ",connFinishedMap.size:" + connFinishedMap.size() +
",recordStartMap.size:" + recordStartMap.size() + ",recordEndMap.size:" + recordEndMap.size());
}
return null;
}
List<String[]> lst = new ArrayList<>();
lst.add(genTraceRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp()));
lst.add(genTraceRecord("Parse_SQL", parseStart.getTimestamp(), routeStart.getTimestamp()));
if (simpleHandler != null) {
if (genSimpleResults(lst)) return null;
} else if (builder != null) {
if (genComplexQueryResults(lst)) return null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("not support trace this query");
}
return null;
}
lst.add(genTraceRecord("Over_All", veryStart, veryEnd));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("end genTraceResult");
}
clear();
return lst;
}
private void genRunningComplexQueryResults(List<String[]> lst) {
List<ReferenceHandlerInfo> results = ComplexQueryPlanUtil.getComplexQueryResult(builder);
long lastChildFinished = preExecuteEnd.getTimestamp();
for (ReferenceHandlerInfo result : results) {
DMLResponseHandler handler = result.getHandler();
if (handler instanceof BaseSelectHandler) {
Map<String, TraceRecord> fetchStartRecordMap = connReceivedMap.get(handler);
if (fetchStartRecordMap == null) {
if (!result.isNestLoopQuery()) {
lst.add(genTraceRecord("Execute_SQL", lastChildFinished, result.getName(), result.getRefOrSQL())); // lastChildFinished may is Long.MAX_VALUE
} else {
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished)); // lastChildFinished may is Long.MAX_VALUE
}
lst.add(genTraceRecord("Fetch_result", result.getName(), result.getRefOrSQL()));
} else {
TraceRecord fetchStartRecord = fetchStartRecordMap.values().iterator().next();
if (!result.isNestLoopQuery()) {
lst.add(genTraceRecord("Execute_SQL", lastChildFinished, fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
if (handlerStart == null) {
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished)); // lastChildFinished may is Long.MAX_VALUE
} else if (handlerEnd == null) {
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp()));
lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp()));
lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
}
}
Map<String, TraceRecord> fetchEndRecordMap = connFinishedMap.get(handler);
if (fetchEndRecordMap == null) {
lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
TraceRecord fetchEndRecord = fetchEndRecordMap.values().iterator().next();
lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
}
}
} else if (handler instanceof OutputHandler) {
TraceRecord startWrite = recordStartMap.get(handler);
if (startWrite == null) {
lst.add(genTraceRecord("Write_to_Client"));
} else if (veryEnd == 0) {
lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp()));
} else {
lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp(), veryEnd));
}
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
if (handlerStart == null) {
lst.add(genTraceRecord(result.getType()));
} else if (handlerEnd == null) {
lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
}
if (handler.getNextHandler() == null) {
if (handlerEnd != null) {
lastChildFinished = Math.max(lastChildFinished, handlerEnd.getTimestamp());
} else {
lastChildFinished = Long.MAX_VALUE;
}
}
}
}
}
private boolean genComplexQueryResults(List<String[]> lst) {
lst.add(genTraceRecord("Try_Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp()));
lst.add(genTraceRecord("Try_to_Optimize", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp()));
List<ReferenceHandlerInfo> results = ComplexQueryPlanUtil.getComplexQueryResult(builder);
long lastChildFinished = preExecuteEnd.getTimestamp();
for (ReferenceHandlerInfo result : results) {
DMLResponseHandler handler = result.getHandler();
if (handler instanceof BaseSelectHandler) {
Map<String, TraceRecord> fetchStartRecordMap = connReceivedMap.get(handler);
Map<String, TraceRecord> fetchEndRecordMap = connFinishedMap.get(handler);
if (fetchStartRecordMap == null || fetchEndRecordMap == null || fetchStartRecordMap.size() != 1 || fetchEndRecordMap.size() != 1) {
printNoResultDebug(fetchStartRecordMap, fetchEndRecordMap);
return true;
}
TraceRecord fetchStartRecord = fetchStartRecordMap.values().iterator().next();
TraceRecord fetchEndRecord = fetchEndRecordMap.values().iterator().next();
if (!result.isNestLoopQuery()) {
lst.add(genTraceRecord("Execute_SQL", lastChildFinished, fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
if (handlerStart == null || handlerEnd == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs for handler" + handler);
}
return true;
}
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp()));
lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
}
lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else if (handler instanceof OutputHandler) {
TraceRecord startWrite = recordStartMap.get(handler);
if (startWrite == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs for OutputHandler");
}
return true;
}
lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp(), veryEnd));
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
if (handlerStart == null || handlerEnd == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs for handler" + handler);
}
return true;
}
lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
if (handler.getNextHandler() == null) {
lastChildFinished = Math.max(lastChildFinished, handlerEnd.getTimestamp());
}
}
}
return false;
}
private void printNoResultDebug(Map<String, TraceRecord> fetchStartRecordMap, Map<String, TraceRecord> fetchEndRecordMap) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs for connection");
if (fetchStartRecordMap != null) {
LOGGER.debug("fetchStartRecordMap size is " + fetchStartRecordMap.size());
}
if (fetchEndRecordMap != null) {
LOGGER.debug("fetchEndRecordMap size is " + fetchEndRecordMap.size());
}
}
}
private boolean genSimpleResults(List<String[]> lst) {
lst.add(genTraceRecord("Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp()));
lst.add(genTraceRecord("Prepare_to_Push", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp()));
Map<String, TraceRecord> connFetchStartMap = connReceivedMap.get(simpleHandler);
Map<String, TraceRecord> connFetchEndMap = connFinishedMap.get(simpleHandler);
List<String[]> executeList = new ArrayList<>(connFetchStartMap.size());
List<String[]> fetchList = new ArrayList<>(connFetchStartMap.size());
long minFetchStart = Long.MAX_VALUE;
long maxFetchEnd = 0;
for (Map.Entry<String, TraceRecord> fetchStart : connFetchStartMap.entrySet()) {
TraceRecord fetchStartRecord = fetchStart.getValue();
minFetchStart = Math.min(minFetchStart, fetchStartRecord.getTimestamp());
executeList.add(genTraceRecord("Execute_SQL", preExecuteEnd.getTimestamp(), fetchStartRecord.getTimestamp(), fetchStartRecord.getShardingNode(), fetchStartRecord.getRef()));
TraceRecord fetchEndRecord = connFetchEndMap.get(fetchStart.getKey());
if (fetchEndRecord == null) {
LOGGER.debug("connection fetchEndRecord is null ");
return true;
}
fetchList.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), fetchStartRecord.getShardingNode(), fetchStartRecord.getRef()));
maxFetchEnd = Math.max(maxFetchEnd, fetchEndRecord.getTimestamp());
}
lst.addAll(executeList);
lst.addAll(fetchList);
if (adtCommitBegin != null) {
lst.add(genTraceRecord("Distributed_Transaction_Prepare", maxFetchEnd, adtCommitBegin.getTimestamp()));
lst.add(genTraceRecord("Distributed_Transaction_Commit", adtCommitBegin.getTimestamp(), adtCommitEnd.getTimestamp()));
}
lst.add(genTraceRecord("Write_to_Client", minFetchStart, veryEnd));
return false;
}
private void genRunningSimpleResults(List<String[]> lst) {
Map<String, TraceRecord> connFetchStartMap = connReceivedMap.get(simpleHandler);
@@ -509,107 +301,250 @@ public class TraceResult implements Cloneable {
}
if (minFetchStart == Long.MAX_VALUE) {
lst.add(genTraceRecord("Write_to_Client"));
} else if (veryEnd == 0) {
} else if (requestEnd == null) {
lst.add(genTraceRecord("Write_to_Client", minFetchStart));
} else {
lst.add(genTraceRecord("Write_to_Client", minFetchStart, veryEnd));
lst.add(genTraceRecord("Write_to_Client", minFetchStart, requestEnd.getTimestamp()));
}
}
private String[] genTraceRecord(String operation, long start) {
return genTraceRecord(operation, start, "-", "-");
private void genRunningComplexQueryResults(List<String[]> lst) {
List<ReferenceHandlerInfo> results = ComplexQueryPlanUtil.getComplexQueryResult(builder);
long lastChildFinished = preExecuteEnd.getTimestamp();
for (ReferenceHandlerInfo result : results) {
DMLResponseHandler handler = result.getHandler();
if (handler instanceof BaseSelectHandler) {
Map<String, TraceRecord> fetchStartRecordMap = connReceivedMap.get(handler);
if (fetchStartRecordMap == null) {
if (!result.isNestLoopQuery()) {
lst.add(genTraceRecord("Execute_SQL", lastChildFinished, result.getName(), result.getRefOrSQL())); // lastChildFinished may is Long.MAX_VALUE
} else {
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished)); // lastChildFinished may is Long.MAX_VALUE
}
lst.add(genTraceRecord("Fetch_result", result.getName(), result.getRefOrSQL()));
} else {
TraceRecord fetchStartRecord = fetchStartRecordMap.values().iterator().next();
if (!result.isNestLoopQuery()) {
lst.add(genTraceRecord("Execute_SQL", lastChildFinished, fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
if (handlerStart == null) {
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished)); // lastChildFinished may is Long.MAX_VALUE
} else if (handlerEnd == null) {
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp()));
lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp()));
lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
}
}
Map<String, TraceRecord> fetchEndRecordMap = connFinishedMap.get(handler);
if (fetchEndRecordMap == null) {
lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
TraceRecord fetchEndRecord = fetchEndRecordMap.values().iterator().next();
lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
}
}
} else if (handler instanceof OutputHandler) {
TraceRecord startWrite = recordStartMap.get(handler);
if (startWrite == null) {
lst.add(genTraceRecord("Write_to_Client"));
} else if (requestEnd == null) {
lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp()));
} else {
lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp(), requestEnd.getTimestamp()));
}
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
if (handlerStart == null) {
lst.add(genTraceRecord(result.getType()));
} else if (handlerEnd == null) {
lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
}
}
private String[] genTraceRecord(String operation, long start, String shardingNode, String ref) {
if (start == Long.MAX_VALUE) {
return genTraceRecord(operation, shardingNode, ref);
if (handler.getNextHandler() == null) {
if (handlerEnd != null) {
lastChildFinished = Math.max(lastChildFinished, handlerEnd.getTimestamp());
} else {
lastChildFinished = Long.MAX_VALUE;
}
}
}
}
String[] readQuery = new String[6];
readQuery[0] = operation;
readQuery[1] = nanoToMilliSecond(start - veryStart);
readQuery[2] = "unfinished";
readQuery[3] = "unknown";
readQuery[4] = shardingNode;
readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " ");
return readQuery;
}
private String[] genTraceRecord(String operation, String shardingNode, String ref) {
String[] readQuery = new String[6];
readQuery[0] = operation;
readQuery[1] = "not started";
readQuery[2] = "unfinished";
readQuery[3] = "unknown";
readQuery[4] = shardingNode;
readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " ");
return readQuery;
}
private String[] genTraceRecord(String operation) {
return genTraceRecord(operation, "-", "-");
}
private String[] genTraceRecord(String operation, long start, long end) {
return genTraceRecord(operation, start, end, "-", "-");
}
private String[] genTraceRecord(String operation, long start, long end, String shardingNode, String ref) {
String[] readQuery = new String[6];
readQuery[0] = operation;
readQuery[1] = nanoToMilliSecond(start - veryStart);
readQuery[2] = nanoToMilliSecond(end - veryStart);
readQuery[3] = nanoToMilliSecond(end - start);
readQuery[4] = shardingNode;
readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " ");
return readQuery;
}
private String nanoToMilliSecond(long nano) {
double milliSecond = (double) nano / 1000000;
return String.valueOf(milliSecond);
}
public boolean isCompleted() {
return veryStart != 0 && veryEnd != 0 && connFlagMap.size() != 0 && connReceivedMap.size() == connFinishedMap.size() && recordStartMap.size() == recordEndMap.size();
}
public SqlTraceType getType() {
return this.type;
}
public List<String[]> genLogResult() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("start genLogResult");
}
if (!isCompleted()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs,veryEnd:" + veryEnd + ",connFlagMap.size:" + connFlagMap.size() +
",connReceivedMap.size:" + connReceivedMap.size() + ",connFinishedMap.size:" + connFinishedMap.size() +
",recordStartMap.size:" + connReceivedMap.size() + ",recordEndMap.size:" + connFinishedMap.size());
// show trace
public List<String[]> genShowTraceResult() {
try {
if (this.previous != null) {
return this.previous.genTraceResult();
}
return null;
} finally {
this.previous = null;
}
}
private List<String[]> genTraceResult() {
List<String[]> lst = new ArrayList<>();
lst.add(genLogRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp()));
lst.add(genLogRecord("Prepare_Push", parseStart.getTimestamp(), preExecuteEnd.getTimestamp()));
if (simpleHandler != null) {
if (genSimpleLogs(lst)) return null;
} else if (builder != null) {
if (genComplexQueryLogs(lst)) return null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("not support trace this query");
if (isCompletedV2()) {
lst.add(genTraceRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp()));
lst.add(genTraceRecord("Parse_SQL", parseStart.getTimestamp(), routeStart.getTimestamp()));
if (simpleHandler != null) {
if (genSimpleResults(lst)) return null;
} else if (builder != null) {
if (genComplexQueryResults(lst)) return null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("not support trace this query");
}
return null;
}
} else {
if (isCompletedV1() && type == null) {
genTraceRecord(lst, "Read_SQL", requestStart, parseStart);
genTraceRecord(lst, "Parse_SQL", parseStart, routeStart, requestEnd);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs,requestEnd:" + requestEnd.getTimestamp() + ",connFlagMap.size:" + connFlagMap.size() +
",connReceivedMap.size:" + connReceivedMap.size() + ",connFinishedMap.size:" + connFinishedMap.size() +
",recordStartMap.size:" + recordStartMap.size() + ",recordEndMap.size:" + recordEndMap.size());
}
return null;
}
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("end genLogResult");
if (lst.size() > 0) {
lst.add(genTraceRecord("Over_All", requestStart.getTimestamp(), requestEnd.getTimestamp()));
}
return lst;
}
private boolean genComplexQueryResults(List<String[]> lst) {
lst.add(genTraceRecord("Try_Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp()));
lst.add(genTraceRecord("Try_to_Optimize", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp()));
List<ReferenceHandlerInfo> results = ComplexQueryPlanUtil.getComplexQueryResult(builder);
long lastChildFinished = preExecuteEnd.getTimestamp();
for (ReferenceHandlerInfo result : results) {
DMLResponseHandler handler = result.getHandler();
if (handler instanceof BaseSelectHandler) {
Map<String, TraceRecord> fetchStartRecordMap = connReceivedMap.get(handler);
Map<String, TraceRecord> fetchEndRecordMap = connFinishedMap.get(handler);
if (fetchStartRecordMap == null || fetchEndRecordMap == null || fetchStartRecordMap.size() != 1 || fetchEndRecordMap.size() != 1) {
printNoResultDebug(fetchStartRecordMap, fetchEndRecordMap);
return true;
}
TraceRecord fetchStartRecord = fetchStartRecordMap.values().iterator().next();
TraceRecord fetchEndRecord = fetchEndRecordMap.values().iterator().next();
if (!result.isNestLoopQuery()) {
lst.add(genTraceRecord("Execute_SQL", lastChildFinished, fetchStartRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
if (handlerStart == null || handlerEnd == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs for handler" + handler);
}
return true;
}
lst.add(genTraceRecord("Generate_New_Query", lastChildFinished, handlerStart.getTimestamp()));
lst.add(genTraceRecord("Execute_SQL", handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
}
lst.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), result.getName(), result.getRefOrSQL()));
} else if (handler instanceof OutputHandler) {
TraceRecord startWrite = recordStartMap.get(handler);
if (startWrite == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs for OutputHandler");
}
return true;
}
lst.add(genTraceRecord("Write_to_Client", startWrite.getTimestamp(), requestEnd.getTimestamp()));
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
if (handlerStart == null || handlerEnd == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs for handler" + handler);
}
return true;
}
lst.add(genTraceRecord(result.getType(), handlerStart.getTimestamp(), handlerEnd.getTimestamp(), result.getName(), result.getRefOrSQL()));
if (handler.getNextHandler() == null) {
lastChildFinished = Math.max(lastChildFinished, handlerEnd.getTimestamp());
}
}
}
return false;
}
private boolean genSimpleResults(List<String[]> lst) {
lst.add(genTraceRecord("Route_Calculation", routeStart.getTimestamp(), preExecuteStart.getTimestamp()));
lst.add(genTraceRecord("Prepare_to_Push", preExecuteStart.getTimestamp(), preExecuteEnd.getTimestamp()));
Map<String, TraceRecord> connFetchStartMap = connReceivedMap.get(simpleHandler);
Map<String, TraceRecord> connFetchEndMap = connFinishedMap.get(simpleHandler);
List<String[]> executeList = new ArrayList<>(connFetchStartMap.size());
List<String[]> fetchList = new ArrayList<>(connFetchStartMap.size());
long minFetchStart = Long.MAX_VALUE;
long maxFetchEnd = 0;
for (Map.Entry<String, TraceRecord> fetchStart : connFetchStartMap.entrySet()) {
TraceRecord fetchStartRecord = fetchStart.getValue();
minFetchStart = Math.min(minFetchStart, fetchStartRecord.getTimestamp());
executeList.add(genTraceRecord("Execute_SQL", preExecuteEnd.getTimestamp(), fetchStartRecord.getTimestamp(), fetchStartRecord.getShardingNode(), fetchStartRecord.getRef()));
TraceRecord fetchEndRecord = connFetchEndMap.get(fetchStart.getKey());
if (fetchEndRecord == null) {
LOGGER.debug("connection fetchEndRecord is null ");
return true;
}
fetchList.add(genTraceRecord("Fetch_result", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp(), fetchStartRecord.getShardingNode(), fetchStartRecord.getRef()));
maxFetchEnd = Math.max(maxFetchEnd, fetchEndRecord.getTimestamp());
}
lst.addAll(executeList);
lst.addAll(fetchList);
if (adtCommitBegin != null) {
lst.add(genTraceRecord("Distributed_Transaction_Prepare", maxFetchEnd, adtCommitBegin.getTimestamp()));
lst.add(genTraceRecord("Distributed_Transaction_Commit", adtCommitBegin.getTimestamp(), adtCommitEnd.getTimestamp()));
}
lst.add(genTraceRecord("Write_to_Client", minFetchStart, requestEnd.getTimestamp()));
return false;
}
// slow log
public List<String[]> genLogResult() {
List<String[]> lst = new ArrayList<>();
if (isCompletedV2()) {
lst.add(genLogRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp()));
lst.add(genLogRecord("Prepare_Push", parseStart.getTimestamp(), preExecuteEnd.getTimestamp()));
if (simpleHandler != null) {
if (genSimpleLogs(lst)) return null;
} else if (builder != null) {
if (genComplexQueryLogs(lst)) return null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("not support trace this query");
}
return null;
}
} else {
if (isCompletedV1() && type == null) {
lst.add(genLogRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp()));
lst.add(genLogRecord("Inner_Execute", parseStart.getTimestamp(), requestEnd.getTimestamp()));
lst.add(genLogRecord("Write_Client", requestEnd.getTimestamp(), requestEnd.getTimestamp()));
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs,requestEnd:" + requestEnd.getTimestamp() + ",connFlagMap.size:" + connFlagMap.size() +
",connReceivedMap.size:" + connReceivedMap.size() + ",connFinishedMap.size:" + connFinishedMap.size() +
",recordStartMap.size:" + recordStartMap.size() + ",recordEndMap.size:" + recordEndMap.size());
}
return null;
}
}
return lst;
}
private boolean genComplexQueryLogs(List<String[]> lst) {
List<ReferenceHandlerInfo> results = ComplexQueryPlanUtil.getComplexQueryResult(builder);
@@ -648,7 +583,7 @@ public class TraceResult implements Cloneable {
}
return true;
}
lst.add(genLogRecord("Write_Client", startWrite.getTimestamp(), veryEnd));
lst.add(genLogRecord("Write_Client", startWrite.getTimestamp(), requestEnd.getTimestamp()));
} else {
TraceRecord handlerStart = recordStartMap.get(handler);
TraceRecord handlerEnd = recordEndMap.get(handler);
@@ -687,10 +622,98 @@ public class TraceResult implements Cloneable {
}
lst.addAll(executeList);
lst.addAll(fetchList);
lst.add(genLogRecord("Write_Client", minFetchStart, veryEnd));
lst.add(genLogRecord("Write_Client", minFetchStart, requestEnd.getTimestamp()));
return false;
}
private boolean isCompletedV1() {
return requestStart != null && requestEnd != null;
}
public boolean isCompletedV2() {
return isCompletedV1() && connFlagMap.size() != 0 && connReceivedMap.size() == connFinishedMap.size() && recordStartMap.size() == recordEndMap.size();
}
private boolean genTraceRecord(List<String[]> lst, String operation, TraceRecord start0, TraceRecord end0) {
return genTraceRecord(lst, operation, start0, end0, null);
}
private boolean genTraceRecord(List<String[]> lst, String operation, TraceRecord start0, TraceRecord end0, TraceRecord finalEnd) {
if (end0 == null) {
if (finalEnd != null) {
lst.add(genTraceRecord(operation, start0.getTimestamp(), finalEnd.getTimestamp()));
lst.add(genTraceRecord("Write_to_Client", finalEnd.getTimestamp(), finalEnd.getTimestamp()));
} else {
lst.add(genTraceRecord(operation, start0.getTimestamp()));
}
return true;
} else {
lst.add(genTraceRecord(operation, start0.getTimestamp(), end0.getTimestamp()));
return false;
}
}
private String[] genTraceRecord(String operation, long start) {
return genTraceRecord(operation, start, "-", "-");
}
private String[] genTraceRecord(String operation, long start, String shardingNode, String ref) {
if (start == Long.MAX_VALUE) {
return genTraceRecord(operation, shardingNode, ref);
}
String[] readQuery = new String[6];
readQuery[0] = operation;
readQuery[1] = nanoToMilliSecond(start - requestStart.getTimestamp());
readQuery[2] = "unfinished";
readQuery[3] = "unknown";
readQuery[4] = shardingNode;
readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " ");
return readQuery;
}
private String[] genTraceRecord(String operation, String shardingNode, String ref) {
String[] readQuery = new String[6];
readQuery[0] = operation;
readQuery[1] = "not started";
readQuery[2] = "unfinished";
readQuery[3] = "unknown";
readQuery[4] = shardingNode;
readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " ");
return readQuery;
}
private String[] genTraceRecord(String operation) {
return genTraceRecord(operation, "-", "-");
}
private String[] genTraceRecord(String operation, long start, long end) {
return genTraceRecord(operation, start, end, "-", "-");
}
private String[] genTraceRecord(String operation, long start, long end, String shardingNode, String ref) {
String[] readQuery = new String[6];
readQuery[0] = operation;
readQuery[1] = nanoToMilliSecond(start - requestStart.getTimestamp());
readQuery[2] = nanoToMilliSecond(end - requestStart.getTimestamp());
readQuery[3] = nanoToMilliSecond(end - start);
readQuery[4] = shardingNode;
readQuery[5] = ref.replaceAll("[\\t\\n\\r]", " ");
return readQuery;
}
private void printNoResultDebug(Map<String, TraceRecord> fetchStartRecordMap, Map<String, TraceRecord> fetchEndRecordMap) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs for connection");
if (fetchStartRecordMap != null) {
LOGGER.debug("fetchStartRecordMap size is " + fetchStartRecordMap.size());
}
if (fetchEndRecordMap != null) {
LOGGER.debug("fetchEndRecordMap size is " + fetchEndRecordMap.size());
}
}
}
private String[] genLogRecord(String operation, long start, long end) {
String[] readQuery = new String[2];
readQuery[0] = operation;
@@ -699,7 +722,16 @@ public class TraceResult implements Cloneable {
}
public double getOverAllMilliSecond() {
return (double) (veryEnd - veryStart) / 1000000;
return (double) (requestEnd.getTimestamp() - requestStart.getTimestamp()) / 1000000;
}
public String getOverAllSecond() {
return nanoToSecond(requestEnd.getTimestamp() - requestStart.getTimestamp());
}
private String nanoToMilliSecond(long nano) {
double milliSecond = (double) nano / 1000000;
return String.valueOf(milliSecond);
}
private String nanoToSecond(long nano) {
@@ -707,12 +739,19 @@ public class TraceResult implements Cloneable {
return String.format("%.6f", milliSecond);
}
public String getOverAllSecond() {
return nanoToSecond(veryEnd - veryStart);
public RouteResultsetNode[] getShardingNodes() {
return shardingNodes;
}
public SqlTraceType getType() {
if (this.type == null) {
return SqlTraceType.SIMPLE_QUERY;
}
return this.type;
}
@Override
public Object clone() {
public TraceResult clone() {
TraceResult tr;
try {
tr = (TraceResult) super.clone();

View File

@@ -482,7 +482,7 @@ public abstract class FrontendService<T extends UserConfig> extends AbstractServ
public void afterConsumed(ServiceTask executeTask) {
if (needDelayed) {
beforeWriteFinish(WriteFlags.QUERY_END);
beforeWriteFinish(WriteFlags.QUERY_END, ResultFlag.OTHER);
afterWriteFinish(WriteFlags.QUERY_END);
lastConsumedTask = new DelayedServiceTask(executeTask);
needDelayed = false;

View File

@@ -527,7 +527,7 @@ public class ShardingService extends BusinessService<ShardingUserConfig> {
@Override
public void beforeWriteFinish(@NotNull EnumSet<WriteFlag> writeFlags) {
public void beforeWriteFinish(@NotNull EnumSet<WriteFlag> writeFlags, ResultFlag resultFlag) {
for (BackendConnection backendConnection : session.getTargetMap().values()) {
TraceManager.sessionFinish(backendConnection.getBackendService());
}
@@ -542,7 +542,7 @@ public class ShardingService extends BusinessService<ShardingUserConfig> {
} else if (writeFlags.contains(WriteFlag.END_OF_SESSION)) {
TraceManager.sessionFinish(this);
}
session.setStageFinished();
session.setResponseTime((resultFlag == ResultFlag.OK || resultFlag == ResultFlag.EOF_ROW));
if (session.isDiscard() || session.isKilled()) {
session.setKilled(false);
session.setDiscard(false);

View File

@@ -16,6 +16,7 @@ import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.ResetConnectionPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.plan.common.field.FieldUtil;
import com.actiontech.dble.server.variables.MysqlVariable;
@@ -149,7 +150,7 @@ public class SetTestJob implements ResponseHandler, Runnable {
ResetConnHandler handler = new ResetConnHandler();
responseService.setResponseHandler(handler);
responseService.setComplexQuery(true);
responseService.write(ResetConnectionPacket.RESET, WriteFlags.QUERY_END);
responseService.write(ResetConnectionPacket.RESET, WriteFlags.QUERY_END, ResultFlag.OK);
}
}
@@ -194,7 +195,7 @@ public class SetTestJob implements ResponseHandler, Runnable {
ResetConnHandler handler = new ResetConnHandler();
responseService.setResponseHandler(handler);
responseService.setComplexQuery(true);
responseService.write(ResetConnectionPacket.RESET, WriteFlags.QUERY_END);
responseService.write(ResetConnectionPacket.RESET, WriteFlags.QUERY_END, ResultFlag.EOF_ROW);
}
}

View File

@@ -15,6 +15,7 @@ import com.actiontech.dble.net.mysql.ErrorPacket;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.net.service.ResultFlag;
import com.actiontech.dble.net.service.WriteFlags;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.parser.ServerParse;
@@ -98,7 +99,7 @@ public class TransformSQLJob implements ResponseHandler, Runnable {
@Override
public void okResponse(byte[] ok, @NotNull AbstractService service) {
this.managerService.write(ok, WriteFlags.QUERY_END);
this.managerService.write(ok, WriteFlags.QUERY_END, ResultFlag.OK);
connection.release();
}
@@ -119,7 +120,7 @@ public class TransformSQLJob implements ResponseHandler, Runnable {
@Override
public void rowEofResponse(byte[] eof, boolean isLeft, @NotNull AbstractService service) {
managerService.write(eof, WriteFlags.QUERY_END);
managerService.write(eof, WriteFlags.QUERY_END, ResultFlag.EOF_ROW);
connection.release();
}
@@ -133,7 +134,7 @@ public class TransformSQLJob implements ResponseHandler, Runnable {
}
private void writeError(byte[] err) {
managerService.write(err, WriteFlags.SESSION_END);
managerService.write(err, WriteFlags.SESSION_END, ResultFlag.ERROR);
if (connection != null) {
connection.release();
}