ATK-4339&ATK-4341 :support slow log

This commit is contained in:
dcy
2025-04-21 16:54:22 +08:00
committed by Rico
parent ae3cad70ab
commit 89b77b7703
20 changed files with 648 additions and 33 deletions
@@ -180,6 +180,17 @@ public final class SystemConfig {
//unit: ms
private long releaseTimeout = 10L;
private int appendTraceId = 1;
public int getAppendTraceId() {
return appendTraceId;
}
public void setAppendTraceId(int appendTraceId) {
this.appendTraceId = appendTraceId;
}
public int getEnableAsyncRelease() {
return enableAsyncRelease;
}
@@ -7,21 +7,21 @@ package com.actiontech.dble.log.slow;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.route.util.RouterUtil;
import com.actiontech.dble.server.trace.TraceResult;
import com.actiontech.dble.server.trace.ITraceResult;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class SlowQueryLogEntry {
private TraceResult trace;
private ITraceResult trace;
private long timeStamp;
private String sql;
private UserName user;
private String clientIp;
private long connID;
SlowQueryLogEntry(String sql, TraceResult traceResult, UserName user, String clientIp, long connID) {
SlowQueryLogEntry(String sql, ITraceResult traceResult, UserName user, String clientIp, long connID) {
this.timeStamp = System.currentTimeMillis();
this.sql = RouterUtil.getFixedSql(sql);
this.trace = traceResult;
@@ -9,8 +9,8 @@ import com.actiontech.dble.btrace.provider.GeneralProvider;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.DailyRotateLogStore;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.server.trace.TraceResult;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.server.trace.ITraceResult;
import com.actiontech.dble.services.BusinessService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,9 +136,9 @@ public class SlowQueryLogProcessor extends Thread {
};
}
public void putSlowQueryLog(ShardingService service, TraceResult log) {
if (log.isCompleted() && log.getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) {
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
public void putSlowQueryLog(BusinessService service, ITraceResult log, String executeSql) {
if (log.isCompleted() && log.getOverAllMilliSecond() >= SlowQueryLog.getInstance().getSlowTime()) {
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(executeSql, log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
final boolean enQueue = queue.offer(logEntry);
if (!enQueue) {
//abort
@@ -147,4 +147,15 @@ public class SlowQueryLogProcessor extends Thread {
}
}
}
public void putSlowQueryLogForce(BusinessService service, ITraceResult log, String executeSql) {
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(executeSql, log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
final boolean enQueue = queue.offer(logEntry);
if (!enQueue) {
//abort
String errorMsg = "since there are too many slow query logs to be written, some slow query logs will be discarded so as not to affect business requirements. Discard log entry: {" + logEntry.toString() + "}";
LOGGER.warn(errorMsg);
}
}
}
@@ -19,6 +19,9 @@ public final class ManagerParseOnOff {
public static final int CUSTOM_MYSQL_HA = 3;
public static final int CAP_CLIENT_FOUND_ROWS = 4;
public static final int APPEND_TRACE_ID = 5;
public static int parse(String stmt, int offset) {
int i = offset;
for (; i < stmt.length(); i++) {
@@ -64,6 +67,9 @@ public final class ManagerParseOnOff {
if (prefix.startsWith("ALERT") && (stmt.length() == offset + 5 || ParseUtil.isEOF(stmt, offset + 5))) {
return ALERT;
}
if (prefix.startsWith("APPENDTRACEID") && (stmt.length() == offset + 13 || ParseUtil.isEOF(stmt, offset + 13))) {
return APPEND_TRACE_ID;
}
}
return OTHER;
}
@@ -3,9 +3,15 @@ package com.actiontech.dble.rwsplit;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.ByteUtil;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.server.SessionStage;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.server.trace.RwTraceResult;
import com.actiontech.dble.server.trace.TraceRecord;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.rwsplit.*;
import com.actiontech.dble.singleton.RouteService;
import com.actiontech.dble.util.StringUtil;
@@ -16,6 +22,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RWSplitNonBlockingSession {
@@ -24,6 +32,9 @@ public class RWSplitNonBlockingSession {
private volatile BackendConnection conn;
private final RWSplitService rwSplitService;
private PhysicalDbGroup rwGroup;
private volatile RwTraceResult traceResult = new RwTraceResult();
private volatile SessionStage sessionStage = SessionStage.Init;
public RWSplitNonBlockingSession(RWSplitService service) {
this.rwSplitService = service;
@@ -62,6 +73,10 @@ public class RWSplitNonBlockingSession {
if (handler == null) return;
PhysicalDbInstance instance = rwGroup.rwSelect(canRunOnMaster(master), isWriteStatistical(writeStatistical), localRead);
checkDest(!instance.isReadInstance());
endRoute();
setPreExecuteEnd(RwTraceResult.SqlTraceType.RWSPLIT_QUERY);
setTraceSimpleHandler((ResponseHandler) handler);
traceResult.setDBInstance(instance);
instance.getConnection(rwSplitService.getSchema(), handler, null, false);
} catch (IOException e) {
LOGGER.warn("select conn error", e);
@@ -78,6 +93,10 @@ public class RWSplitNonBlockingSession {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select bind conn[id={}]", conn.getId());
}
endRoute();
setPreExecuteEnd(RwTraceResult.SqlTraceType.RWSPLIT_QUERY);
setTraceSimpleHandler(handler);
traceResult.setDBInstance((PhysicalDbInstance) conn.getInstance());
// for ps needs to send master
if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) {
long statementId = ByteUtil.readUB4(originPacket, 5);
@@ -204,4 +223,105 @@ public class RWSplitNonBlockingSession {
public BackendConnection getConn() {
return conn;
}
public void setRequestTime() {
sessionStage = SessionStage.Read_SQL;
long requestTime = 0;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
requestTime = System.nanoTime();
traceResult.setVeryStartPrepare(requestTime);
}
}
public void startProcess() {
sessionStage = SessionStage.Parse_SQL;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setParseStartPrepare(new TraceRecord(System.nanoTime()));
}
}
public void endParse() {
sessionStage = SessionStage.Route_Calculation;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.ready();
// traceResult.setRouteStart(new TraceRecord(System.nanoTime()));
}
}
public void endRoute() {
sessionStage = SessionStage.Prepare_to_Push;
}
public void setPreExecuteEnd(RwTraceResult.SqlTraceType type) {
sessionStage = SessionStage.Execute_SQL;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setType(type);
traceResult.setPreExecuteEnd(new TraceRecord(System.nanoTime()));
traceResult.clearConnReceivedMap();
traceResult.clearConnFlagMap();
}
}
public void setTraceSimpleHandler(ResponseHandler simpleHandler) {
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
traceResult.setSimpleHandler(simpleHandler);
}
}
public void setResponseTime(boolean isSuccess) {
sessionStage = SessionStage.Finished;
long responseTime = 0;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
responseTime = System.nanoTime();
traceResult.setVeryEnd(responseTime);
if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) {
SlowQueryLog.getInstance().putSlowQueryLog(this.rwSplitService, (RwTraceResult) traceResult);
traceResult = new RwTraceResult();
}
}
}
public void setBackendResponseEndTime(MySQLResponseService service) {
sessionStage = SessionStage.First_Node_Fetched_Result;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null) {
TraceRecord record = new TraceRecord(System.nanoTime());
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
String key = String.valueOf(service.getConnection().getId());
connMap.put(key, record);
traceResult.addToConnFinishedMap(responseHandler, connMap);
}
}
}
public void setBackendResponseTime(MySQLResponseService service) {
sessionStage = SessionStage.Fetching_Result;
long responseTime = 0;
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
ResponseHandler responseHandler = service.getResponseHandler();
String key = String.valueOf(service.getConnection().getId());
if (responseHandler != null && traceResult.addToConnFlagMap(key) == null) {
responseTime = System.nanoTime();
TraceRecord record = new TraceRecord(responseTime);
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
connMap.put(key, record);
traceResult.addToConnReceivedMap(responseHandler, connMap);
}
}
}
}
@@ -7,8 +7,8 @@ package com.actiontech.dble.server.status;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.slow.SlowQueryLogProcessor;
import com.actiontech.dble.server.trace.TraceResult;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.server.trace.ITraceResult;
import com.actiontech.dble.services.BusinessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +84,11 @@ public final class SlowQueryLog {
this.flushSize = flushSize;
}
public void putSlowQueryLog(ShardingService service, TraceResult log) {
processor.putSlowQueryLog(service, log);
public void putSlowQueryLog(BusinessService service, ITraceResult log) {
processor.putSlowQueryLog(service, log, service.getExecuteSql());
}
public void putSlowQueryLogForce(BusinessService service, ITraceResult log, String sql) {
processor.putSlowQueryLogForce(service, log, sql);
}
}
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2016-2023 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.server.trace;
import java.util.List;
/**
* @author dcy
* Create Date: 2025-04-17
*/
public interface ITraceResult {
public enum SqlTraceType {
SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY, RWSPLIT_QUERY;
}
boolean isCompleted();
RwTraceResult.SqlTraceType getType();
List<String[]> genLogResult();
double getOverAllMilliSecond();
String getOverAllSecond();
}
@@ -0,0 +1,275 @@
/*
* Copyright (C) 2016-2020 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.server.trace;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class RwTraceResult implements Cloneable, ITraceResult {
private static final Logger LOGGER = LoggerFactory.getLogger(RwTraceResult.class);
// private boolean prepareFinished = false;
private long veryStartPrepare;
private long veryStart;
private TraceRecord requestStartPrepare;
private TraceRecord requestStart;
private TraceRecord parseStartPrepare; //requestEnd
private TraceRecord parseStart; //requestEnd
// private TraceRecord routeStart; //parseEnd
// private TraceRecord preExecuteStart; //routeEnd
private TraceRecord preExecuteEnd;
private ResponseHandler simpleHandler = null;
private ConcurrentMap<String, Boolean> connFlagMap = new ConcurrentHashMap<>();
private ConcurrentMap<ResponseHandler, Map<String, TraceRecord>> connReceivedMap = new ConcurrentHashMap<>();
private ConcurrentMap<ResponseHandler, Map<String, TraceRecord>> connFinishedMap = new ConcurrentHashMap<>();
private ConcurrentMap<ResponseHandler, TraceRecord> recordStartMap = new ConcurrentHashMap<>();
private ConcurrentMap<ResponseHandler, TraceRecord> recordEndMap = new ConcurrentHashMap<>();
private long veryEnd;
private SqlTraceType type = SqlTraceType.RWSPLIT_QUERY;
private PhysicalDbInstance dbInstance = null;
public void setVeryStartPrepare(long veryStartPrepare) {
// prepareFinished = false;
this.veryStartPrepare = veryStartPrepare;
this.requestStartPrepare = new TraceRecord(veryStartPrepare);
}
public void setDBInstance(PhysicalDbInstance dbInstance) {
this.dbInstance = dbInstance;
}
public void setParseStartPrepare(TraceRecord parseStartPrepare) {
this.parseStartPrepare = parseStartPrepare;
}
public void setPreExecuteEnd(TraceRecord preExecuteEnd) {
this.preExecuteEnd = preExecuteEnd;
}
public void setSimpleHandler(ResponseHandler simpleHandler) {
this.simpleHandler = simpleHandler;
}
public void setType(SqlTraceType type) {
this.type = type;
}
public Boolean addToConnFlagMap(String item) {
return connFlagMap.putIfAbsent(item, true);
}
public void clearConnFlagMap() {
connFlagMap.clear();
}
public void addToConnReceivedMap(ResponseHandler responseHandler, Map<String, TraceRecord> connMap) {
Map<String, TraceRecord> existReceivedMap = connReceivedMap.putIfAbsent(responseHandler, connMap);
if (existReceivedMap != null) {
existReceivedMap.putAll(connMap);
}
}
public void clearConnReceivedMap() {
connReceivedMap.clear();
}
public void addToConnFinishedMap(ResponseHandler responseHandler, Map<String, TraceRecord> connMap) {
Map<String, TraceRecord> existReceivedMap = connFinishedMap.putIfAbsent(responseHandler, connMap);
if (existReceivedMap != null) {
existReceivedMap.putAll(connMap);
}
}
public void addToRecordStartMap(ResponseHandler handler, TraceRecord traceRecord) {
recordStartMap.putIfAbsent(handler, traceRecord);
}
public void addToRecordEndMap(ResponseHandler handler, TraceRecord traceRecord) {
recordEndMap.putIfAbsent(handler, traceRecord);
}
public void setVeryEnd(long veryEnd) {
this.veryEnd = veryEnd;
}
public void ready() {
// prepareFinished = true;
clear();
veryStart = veryStartPrepare;
requestStart = requestStartPrepare;
parseStart = parseStartPrepare;
veryStartPrepare = 0;
requestStartPrepare = null;
parseStartPrepare = null;
}
private void clear() {
veryStart = 0;
requestStart = null;
parseStart = null;
preExecuteEnd = null;
this.type = null;
simpleHandler = null;
connFlagMap.clear();
for (Map<String, TraceRecord> connReceived : connReceivedMap.values()) {
connReceived.clear();
}
connReceivedMap.clear();
for (Map<String, TraceRecord> connReceived : connFinishedMap.values()) {
connReceived.clear();
}
connFinishedMap.clear();
recordStartMap.clear();
recordEndMap.clear();
veryEnd = 0;
}
@Override
public boolean isCompleted() {
return veryStart != 0 && veryEnd != 0 && connFlagMap.size() != 0 && connReceivedMap.size() == connFinishedMap.size() && recordStartMap.size() == recordEndMap.size();
}
@Override
public SqlTraceType getType() {
return this.type;
}
@Override
public List<String[]> genLogResult() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("start genLogResult");
}
if (!isCompleted()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("collect info not in pairs,veryEnd:" + veryEnd + ",connFlagMap.size:" + connFlagMap.size() +
",connReceivedMap.size:" + connReceivedMap.size() + ",connFinishedMap.size:" + connFinishedMap.size() +
",recordStartMap.size:" + connReceivedMap.size() + ",recordEndMap.size:" + connFinishedMap.size());
}
return null;
}
List<String[]> lst = new ArrayList<>();
lst.add(genLogRecord("Read_SQL", requestStart.getTimestamp(), parseStart.getTimestamp()));
lst.add(genLogRecord("Prepare_Push", parseStart.getTimestamp(), preExecuteEnd.getTimestamp()));
if (simpleHandler != null) {
if (genSimpleLogs(lst)) return null;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("not support trace this query");
}
return null;
}
lst.add(genLogRecord("Group_Name", dbInstance.getDbGroup().getGroupName()));
lst.add(genLogRecord("Instance_Name", dbInstance.getName()));
lst.add(genLogRecord("Is_Master", String.valueOf(!dbInstance.isReadInstance())));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("end genLogResult");
}
return lst;
}
private boolean genSimpleLogs(List<String[]> lst) {
Map<String, TraceRecord> connFetchStartMap = connReceivedMap.get(simpleHandler);
Map<String, TraceRecord> connFetchEndMap = connFinishedMap.get(simpleHandler);
List<String[]> executeList = new ArrayList<>(connFetchStartMap.size());
List<String[]> fetchList = new ArrayList<>(connFetchStartMap.size());
long minFetchStart = Long.MAX_VALUE;
long maxFetchEnd = 0;
for (Map.Entry<String, TraceRecord> fetchStart : connFetchStartMap.entrySet()) {
TraceRecord fetchStartRecord = fetchStart.getValue();
minFetchStart = Math.min(minFetchStart, fetchStartRecord.getTimestamp());
executeList.add(genLogRecord("First_Result_Fetch", preExecuteEnd.getTimestamp(), fetchStartRecord.getTimestamp()));
TraceRecord fetchEndRecord = connFetchEndMap.get(fetchStart.getKey());
if (fetchEndRecord == null) {
LOGGER.debug("connection fetchEndRecord is null ");
return true;
}
fetchList.add(genLogRecord("Last_Result_Fetch", fetchStartRecord.getTimestamp(), fetchEndRecord.getTimestamp()));
maxFetchEnd = Math.max(maxFetchEnd, fetchEndRecord.getTimestamp());
}
lst.addAll(executeList);
lst.addAll(fetchList);
lst.add(genLogRecord("Write_Client", minFetchStart, veryEnd));
return false;
}
private String[] genLogRecord(String operation, long start, long end) {
String[] readQuery = new String[2];
readQuery[0] = operation;
readQuery[1] = nanoToSecond(end - start);
return readQuery;
}
private String[] genLogRecord(String operation, String value) {
String[] readQuery = new String[2];
readQuery[0] = operation;
readQuery[1] = value;
return readQuery;
}
@Override
public double getOverAllMilliSecond() {
return (double) (veryEnd - veryStart) / 1000000;
}
private String nanoToSecond(long nano) {
double milliSecond = (double) nano / 1000000000;
return String.format("%.6f", milliSecond);
}
@Override
public String getOverAllSecond() {
return nanoToSecond(veryEnd - veryStart);
}
@Override
public Object clone() {
RwTraceResult tr;
try {
tr = (RwTraceResult) super.clone();
tr.simpleHandler = this.simpleHandler;
tr.connFlagMap = new ConcurrentHashMap<>();
tr.connFlagMap.putAll(this.connFlagMap);
tr.connReceivedMap = new ConcurrentHashMap<>();
for (Map.Entry<ResponseHandler, Map<String, TraceRecord>> item : connReceivedMap.entrySet()) {
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
connMap.putAll(item.getValue());
tr.connReceivedMap.put(item.getKey(), connMap);
}
tr.connFinishedMap = new ConcurrentHashMap<>();
for (Map.Entry<ResponseHandler, Map<String, TraceRecord>> item : connFinishedMap.entrySet()) {
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
connMap.putAll(item.getValue());
tr.connFinishedMap.put(item.getKey(), connMap);
}
tr.recordStartMap = new ConcurrentHashMap<>();
tr.recordStartMap.putAll(this.recordStartMap);
tr.recordEndMap = new ConcurrentHashMap<>();
tr.recordEndMap.putAll(this.recordEndMap);
return tr;
} catch (Exception e) {
LOGGER.warn("clone TraceResult error", e);
throw new AssertionError(e.getMessage());
}
}
}
@@ -20,12 +20,9 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TraceResult implements Cloneable {
public class TraceResult implements Cloneable, ITraceResult {
public enum SqlTraceType {
SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY;
}
private static final Logger LOGGER = LoggerFactory.getLogger(TraceResult.class);
private boolean prepareFinished = false;
@@ -573,6 +570,7 @@ public class TraceResult implements Cloneable {
return String.valueOf(milliSecond);
}
@Override
public boolean isCompleted() {
return veryStart != 0 && veryEnd != 0 && connFlagMap.size() != 0 && connReceivedMap.size() == connFinishedMap.size() && recordStartMap.size() == recordEndMap.size();
}
@@ -6,12 +6,9 @@
package com.actiontech.dble.services.manager.handler;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.manager.response.OnOffAlert;
import com.actiontech.dble.services.manager.response.OnOffCapClientFoundRows;
import com.actiontech.dble.services.manager.response.OnOffCustomMySQLHa;
import com.actiontech.dble.services.manager.response.OnOffSlowQueryLog;
import com.actiontech.dble.route.parser.ManagerParseOnOff;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.manager.response.*;
public final class DisableHandler {
private DisableHandler() {
@@ -32,6 +29,9 @@ public final class DisableHandler {
case ManagerParseOnOff.CAP_CLIENT_FOUND_ROWS:
OnOffCapClientFoundRows.execute(service, false);
break;
case ManagerParseOnOff.APPEND_TRACE_ID:
OnOffAppendTraceId.execute(service, false);
break;
default:
service.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
}
@@ -7,10 +7,7 @@ package com.actiontech.dble.services.manager.handler;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.manager.response.OnOffAlert;
import com.actiontech.dble.services.manager.response.OnOffCapClientFoundRows;
import com.actiontech.dble.services.manager.response.OnOffCustomMySQLHa;
import com.actiontech.dble.services.manager.response.OnOffSlowQueryLog;
import com.actiontech.dble.services.manager.response.*;
import com.actiontech.dble.route.parser.ManagerParseOnOff;
public final class EnableHandler {
@@ -32,6 +29,9 @@ public final class EnableHandler {
case ManagerParseOnOff.CAP_CLIENT_FOUND_ROWS:
OnOffCapClientFoundRows.execute(service, true);
break;
case ManagerParseOnOff.APPEND_TRACE_ID:
OnOffAppendTraceId.execute(service, true);
break;
default:
service.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
}
@@ -0,0 +1,43 @@
package com.actiontech.dble.services.manager.response;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap;
import com.actiontech.dble.singleton.AppendTraceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public final class OnOffAppendTraceId {
private static final Logger LOGGER = LoggerFactory.getLogger(OnOffAppendTraceId.class);
private OnOffAppendTraceId() {
}
public static void execute(ManagerService service, boolean isOn) {
String onOffStatus = isOn ? "enable" : "disable";
try {
WriteDynamicBootstrap.getInstance().changeValue("appendTraceId", isOn ? "1" : "0");
} catch (IOException e) {
String msg = onOffStatus + " appendTraceId failed";
LOGGER.warn(String.valueOf(service) + " " + msg, e);
service.writeErrMessage(ErrorCode.ER_YES, msg);
return;
}
AppendTraceId.getInstance().setValue(isOn ? 1 : 0);
LOGGER.info(String.valueOf(service) + " " + onOffStatus + " appendTraceId success by manager");
OkPacket ok = new OkPacket();
ok.setPacketId(1);
ok.setAffectedRows(1);
ok.setServerStatus(2);
ok.setMessage((onOffStatus + " appendTraceId success").getBytes());
ok.write(service.getConnection());
}
}
@@ -17,6 +17,7 @@ import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.net.service.ServiceTask;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.services.BusinessService;
@@ -61,6 +62,7 @@ public class MySQLResponseService extends VariablesService {
private volatile Object attachment;
private volatile NonBlockingSession session;
private volatile RWSplitNonBlockingSession session2;
private volatile boolean metaDataSynced = true;
@@ -209,6 +211,11 @@ public class MySQLResponseService extends VariablesService {
return false;
}
session.setBackendResponseTime(this);
} else if (session2 != null) {
// if (session2.isKilled()) {
// return false;
// }
session2.setBackendResponseTime(this);
}
return true;
}
@@ -783,6 +790,17 @@ public class MySQLResponseService extends VariablesService {
public void setSession(NonBlockingSession session) {
this.session = session;
this.session2 = null;
}
public void setSession2(RWSplitNonBlockingSession session2) {
this.session = null;
this.session2 = session2;
}
public RWSplitNonBlockingSession getSession2() {
return session2;
}
public ResponseHandler getResponseHandler() {
@@ -42,6 +42,7 @@ public class PSHandler implements ResponseHandler, PreparedResponseHandler {
public void connectionAcquired(BackendConnection conn) {
MySQLResponseService mysqlService = conn.getBackendService();
mysqlService.setResponseHandler(this);
mysqlService.setSession2(rwSplitService.getSession());
mysqlService.execute(rwSplitService, holder.getPrepareOrigin());
}
@@ -50,6 +50,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
public void execute(final BackendConnection conn) {
MySQLResponseService mysqlService = conn.getBackendService();
mysqlService.setResponseHandler(this);
mysqlService.setSession2(rwSplitService.getSession());
if (originPacket != null) {
mysqlService.execute(rwSplitService, originPacket);
} else if (isHint) {
@@ -106,7 +107,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
MySQLResponseService mysqlService = (MySQLResponseService) service;
boolean executeResponse = mysqlService.syncAndExecute();
if (executeResponse) {
rwSplitService.getSession().setBackendResponseEndTime((MySQLResponseService) service);
final OkPacket packet = new OkPacket();
packet.read(data);
if ((packet.getServerStatus() & HAS_MORE_RESULTS) == 0) {
@@ -118,6 +119,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
synchronized (this) {
if (!write2Client) {
rwSplitService.getSession().setResponseTime(true);
data[3] = (byte) rwSplitService.nextPacketId();
frontedConnection.write(data);
if ((packet.getServerStatus() & HAS_MORE_RESULTS) == 0) {
@@ -157,6 +159,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
@Override
public void rowEofResponse(byte[] eof, boolean isLeft, AbstractService service) {
synchronized (this) {
rwSplitService.getSession().setBackendResponseEndTime((MySQLResponseService) service);
if (!write2Client) {
eof[3] = (byte) rwSplitService.nextPacketId();
if ((eof[7] & HAS_MORE_RESULTS) == 0) {
@@ -180,6 +183,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
buffer = null;
if ((eof[7] & HAS_MORE_RESULTS) == 0) {
write2Client = true;
rwSplitService.getSession().setResponseTime(true);
}
}
}
@@ -217,6 +221,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
@Override
public void preparedOkResponse(byte[] ok, List<byte[]> fields, List<byte[]> params, MySQLResponseService service) {
synchronized (this) {
rwSplitService.getSession().setBackendResponseEndTime((MySQLResponseService) service);
if (buffer == null) {
buffer = frontedConnection.allocate();
}
@@ -239,6 +244,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
callback.callback(true, ok, rwSplitService);
}
frontedConnection.write(buffer);
service.getSession2().setResponseTime(true);
write2Client = true;
buffer = null;
}
@@ -3,17 +3,22 @@ package com.actiontech.dble.services.rwsplit;
import com.actiontech.dble.config.Capabilities;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.handler.FrontendQueryHandler;
import com.actiontech.dble.net.mysql.CommandPacket;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.server.ServerQueryHandler;
import com.actiontech.dble.server.handler.SetHandler;
import com.actiontech.dble.server.handler.UseHandler;
import com.actiontech.dble.server.parser.RwSplitServerParse;
import com.actiontech.dble.singleton.AppendTraceId;
import com.actiontech.dble.singleton.RouteService;
import com.actiontech.dble.singleton.TraceManager;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
public class RWSplitQueryHandler implements FrontendQueryHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerQueryHandler.class);
@@ -41,11 +46,28 @@ public class RWSplitQueryHandler implements FrontendQueryHandler {
return;
}
int rs = RwSplitServerParse.parse(sql);
if (AppendTraceId.getInstance().isEnable()) {
sql = String.format("/*+ trace_id=%d-%d */ %s", session.getService().getConnection().getId(), session.getService().getSqlUniqueId().incrementAndGet(), sql);
}
session.getService().setExecuteSql(sql);
session.endParse();
int hintLength = RouteService.isHintSql(sql);
int sqlType = rs & 0xff;
if (hintLength >= 0) {
session.executeHint(sqlType, sql, null);
} else {
if (AppendTraceId.getInstance().isEnable()) {
CommandPacket packet = new CommandPacket();
final byte COM_QUERY = 0x3;
packet.setCommand(COM_QUERY);
packet.setArg(sql.getBytes());
packet.setPacketId(session.getService().getExecuteSqlBytes()[3]);
ByteArrayOutputStream out = new ByteArrayOutputStream();
packet.write(out);
session.getService().setExecuteSqlBytes(out.toByteArray());
}
if (sqlType != RwSplitServerParse.START && sqlType != RwSplitServerParse.BEGIN &&
sqlType != RwSplitServerParse.COMMIT && sqlType != RwSplitServerParse.ROLLBACK && sqlType != RwSplitServerParse.SET) {
session.getService().singleTransactionsCount();
@@ -6,6 +6,7 @@ import com.actiontech.dble.backend.mysql.MySQLMessage;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.mysql.CommandPacket;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.net.service.AuthResultInfo;
import com.actiontech.dble.net.service.ServiceTask;
@@ -13,15 +14,20 @@ import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.response.Heartbeat;
import com.actiontech.dble.server.response.Ping;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.server.trace.RwTraceResult;
import com.actiontech.dble.server.variables.MysqlVariable;
import com.actiontech.dble.services.BusinessService;
import com.actiontech.dble.singleton.AppendTraceId;
import com.actiontech.dble.singleton.TsQueriesCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -44,6 +50,8 @@ public class RWSplitService extends BusinessService {
public static final int LOCK_READ = 2;
AtomicInteger sqlUniqueId = new AtomicInteger(1);
// prepare statement
private ConcurrentHashMap<Long, PreparedStatementHolder> psHolder = new ConcurrentHashMap<>();
@@ -89,11 +97,13 @@ public class RWSplitService extends BusinessService {
@Override
protected void taskToTotalQueue(ServiceTask task) {
session.setRequestTime();
DbleServer.getInstance().getFrontHandlerQueue().offer(task);
}
@Override
protected void handleInnerData(byte[] data) {
session.startProcess();
// if the statement is load data, directly push down
if (inLoadData) {
session.execute(true, data, (isSuccess, resp, rwSplitService) -> {
@@ -174,6 +184,9 @@ public class RWSplitService extends BusinessService {
try {
switchSchema = mm.readString(getCharset().getClient());
session.execute(true, data, (isSuccess, resp, rwSplitService) -> {
if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) {
SlowQueryLog.getInstance().putSlowQueryLogForce(this.session.getService(), new RwTraceResult(), "use " + switchSchema);
}
if (isSuccess) rwSplitService.setSchema(switchSchema);
});
} catch (UnsupportedEncodingException e) {
@@ -212,26 +225,45 @@ public class RWSplitService extends BusinessService {
sql = sql.substring(0, sql.length() - 1).trim();
}
sql = sql.trim();
final String finalSql = sql;
int rs = ServerParse.parse(sql);
int sqlType = rs & 0xff;
String tmpSql = sql;
byte[] tmpData = data;
if (AppendTraceId.getInstance().isEnable()) {
tmpSql = String.format("/*+ trace_id=%d-%d */ %s", session.getService().getConnection().getId(), getSqlUniqueId().incrementAndGet(), sql);
CommandPacket packet = new CommandPacket();
final byte COM_STMT_PREPARE = 0x16;
packet.setCommand(COM_STMT_PREPARE);
packet.setArg(tmpSql.getBytes());
packet.setPacketId(data[3]);
ByteArrayOutputStream out = new ByteArrayOutputStream();
packet.write(out);
tmpData = out.toByteArray();
}
final String finalSql = tmpSql;
setExecuteSql(finalSql);
final byte[] finalData = tmpData;
session.endParse();
switch (sqlType) {
case ServerParse.SELECT:
int rs2 = ServerParse.parseSpecial(sqlType, sql);
if (rs2 == LOCK_READ) {
session.execute(true, data, (isSuccess, resp, rwSplitService) -> {
session.execute(true, finalData, (isSuccess, resp, rwSplitService) -> {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql));
psHolder.put(statementId, new PreparedStatementHolder(finalData, paramCount, true, finalSql));
}
}, false);
} else {
session.execute(null, data, (isSuccess, resp, rwSplitService) -> {
session.execute(null, finalData, (isSuccess, resp, rwSplitService) -> {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, false, finalSql));
psHolder.put(statementId, new PreparedStatementHolder(finalData, paramCount, false, finalSql));
}
}, false);
}
@@ -241,7 +273,7 @@ public class RWSplitService extends BusinessService {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql));
psHolder.put(statementId, new PreparedStatementHolder(finalData, paramCount, true, finalSql));
}
});
break;
@@ -255,6 +287,10 @@ public class RWSplitService extends BusinessService {
session.execute(true, data, null);
}
public AtomicInteger getSqlUniqueId() {
return sqlUniqueId;
}
public RwSplitUserConfig getUserConfig() {
return (RwSplitUserConfig) userConfig;
}
@@ -293,6 +329,10 @@ public class RWSplitService extends BusinessService {
return executeSqlBytes;
}
public void setExecuteSqlBytes(byte[] executeSqlBytes) {
this.executeSqlBytes = executeSqlBytes;
}
public boolean isInPrepare() {
return inPrepare;
}
@@ -0,0 +1,30 @@
package com.actiontech.dble.singleton;
import com.actiontech.dble.config.model.SystemConfig;
public final class AppendTraceId {
private static final AppendTraceId INSTANCE = new AppendTraceId();
private volatile int value;
public static AppendTraceId getInstance() {
return INSTANCE;
}
public AppendTraceId() {
this.value = SystemConfig.getInstance().getAppendTraceId();
}
public boolean isEnable() {
return value == 1;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
}
@@ -144,6 +144,7 @@ public final class SystemParams {
params.add(new ParamInfo("flushSlowLogSize", SlowQueryLog.getInstance().getFlushSize() + "", "The max size for flushing log to disk, the default is 1000"));
params.add(new ParamInfo("enableAlert", AlertUtil.isEnable() + "", "enable or disable alert"));
params.add(new ParamInfo("capClientFoundRows", CapClientFoundRows.getInstance().isEnableCapClientFoundRows() + "", "Whether to turn on EOF_Packet to return found rows,The default value is false"));
params.add(new ParamInfo("appendTraceId", AppendTraceId.getInstance().getValue() + "", "append the trace id to the sql"));
return params;
}
+1 -1
View File
@@ -90,7 +90,7 @@
-DflushSlowLogSize=1000
# the threshold for judging if the query is slow , unit is millisecond
-DsqlSlowTime=100
-DappendTraceId=0
#-DenableAsyncRelease=1
#-DreleaseTimeout=10