mirror of
https://github.com/actiontech/dble.git
synced 2026-01-05 20:30:40 -06:00
Merge pull request #3493 from actiontech/inner/1783
[inner-1783] adjust transaction's debug log
This commit is contained in:
@@ -232,7 +232,8 @@ public class MySQLHeartbeat {
|
||||
}
|
||||
//after the heartbeat changes from failure to success, it needs to be expanded immediately
|
||||
if (source.getTotalConnections() == 0 && !previousStatus.equals(MySQLHeartbeatStatus.INIT) && !previousStatus.equals(MySQLHeartbeatStatus.OK)) {
|
||||
LOGGER.debug("[updatePoolCapacity] heartbeat to [{}] setOk, previous status is {}", source, previousStatus);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("[updatePoolCapacity] heartbeat to [{}] setOk, previous status is {}", source, previousStatus);
|
||||
source.updatePoolCapacity();
|
||||
}
|
||||
if (isStop) {
|
||||
|
||||
@@ -163,10 +163,7 @@ public class FieldListHandler implements ResponseHandler {
|
||||
ShardingService shardingService = session.getShardingService();
|
||||
String errMsg = "errNo:" + errPkg.getErrNo() + " " + new String(errPkg.getMessage());
|
||||
if (responseService != null && !responseService.isFakeClosed()) {
|
||||
LOGGER.info("execute sql err:{}, con:{}, frontend host:{}/{}/{}", errMsg, responseService,
|
||||
shardingService.getConnection().getHost(),
|
||||
shardingService.getConnection().getLocalPort(),
|
||||
shardingService.getUser());
|
||||
LOGGER.info("execute sql err:{}, con:{}", errMsg, responseService);
|
||||
|
||||
if (responseService.getConnection().isClosed()) {
|
||||
if (responseService.getAttachment() != null) {
|
||||
|
||||
@@ -187,7 +187,8 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
// create new connection
|
||||
node.setRunOnSlave(rrs.getRunOnSlave());
|
||||
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(node.getName());
|
||||
LOGGER.debug("[doConnection] node + " + node.toString());
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("[doConnection] node + " + node.toString());
|
||||
dn.syncGetConnection(dn.getDatabase(), session.getShardingService().isTxStart(), sessionAutocommit, node, this, node);
|
||||
}
|
||||
|
||||
|
||||
@@ -184,10 +184,7 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
|
||||
protected void backConnectionErr(ErrorPacket errPkg, @Nullable MySQLResponseService service, boolean syncFinished) {
|
||||
ShardingService shardingService = session.getShardingService();
|
||||
String errMsg = "errNo:" + errPkg.getErrNo() + " " + new String(errPkg.getMessage());
|
||||
LOGGER.info("execute sql err:{}, con:{}, frontend host:{}/{}/{}", errMsg, service,
|
||||
shardingService.getConnection().getHost(),
|
||||
shardingService.getConnection().getLocalPort(),
|
||||
shardingService.getUser());
|
||||
LOGGER.info("execute sql err:{}, con:{}", errMsg, service);
|
||||
|
||||
if (service != null && !service.isFakeClosed()) {
|
||||
if (service.getConnection().isClosed()) {
|
||||
|
||||
@@ -371,7 +371,7 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl
|
||||
protected boolean clearIfSessionClosed() {
|
||||
if (session.closed()) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("session closed without execution,clear resources " + session);
|
||||
LOGGER.debug("session closed without execution, clear resources " + session);
|
||||
}
|
||||
String msg = "Current session closed";
|
||||
specialHandling(false, msg);
|
||||
|
||||
@@ -90,7 +90,9 @@ public class BaseSelectHandler extends BaseDMLHandler {
|
||||
|
||||
@Override
|
||||
public void okResponse(byte[] ok, @NotNull AbstractService service) {
|
||||
LOGGER.debug("receive ok packet for sync context, service {}", service);
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("receive ok packet for sync context, service {}", service);
|
||||
}
|
||||
((MySQLResponseService) service).syncAndExecute();
|
||||
}
|
||||
|
||||
|
||||
@@ -25,10 +25,12 @@ public class TransactionHandlerManager {
|
||||
private volatile String xaTxId;
|
||||
private TransactionHandler normalHandler;
|
||||
private TransactionHandler xaHandler;
|
||||
private NonBlockingSession session;
|
||||
|
||||
public TransactionHandlerManager(NonBlockingSession session) {
|
||||
this.normalHandler = new NormalTransactionHandler(session);
|
||||
this.xaHandler = new XAHandler(session);
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public String getSessionXaID() {
|
||||
@@ -75,6 +77,9 @@ public class TransactionHandlerManager {
|
||||
}
|
||||
|
||||
public void commit(TransactionCallback callback) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("{} execute commit(), current {}", session.getShardingService().toString2(), session);
|
||||
}
|
||||
if (xaTxId != null) {
|
||||
xaHandler.commit(callback);
|
||||
} else {
|
||||
@@ -83,6 +88,9 @@ public class TransactionHandlerManager {
|
||||
}
|
||||
|
||||
public void syncImplicitCommit() throws SQLException {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("{} execute syncImplicitCommit(), current {}", session.getShardingService().toString2(), session);
|
||||
}
|
||||
if (xaTxId != null) {
|
||||
// implicit commit is not supported in XA transactions
|
||||
// xaHandler.syncImplicitCommit();
|
||||
@@ -92,6 +100,9 @@ public class TransactionHandlerManager {
|
||||
}
|
||||
|
||||
public void rollback(TransactionCallback callback) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("{} execute rollback(), current {}", session.getShardingService().toString2(), session);
|
||||
}
|
||||
if (xaTxId != null) {
|
||||
xaHandler.rollback(callback);
|
||||
} else {
|
||||
|
||||
@@ -220,8 +220,8 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
|
||||
heartBeatHandler.ping(poolConfig.getConnectionHeartbeatTimeout());
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.debug("connection create success: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("connection create success: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn);
|
||||
|
||||
conn.lazySet(STATE_NOT_IN_USE);
|
||||
// spin until a thread takes it or none are waiting
|
||||
@@ -240,7 +240,8 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
|
||||
public void onCreateFail(PooledConnection conn, Throwable e) {
|
||||
if (conn == null || conn.getIsCreateFail().compareAndSet(false, true)) {
|
||||
if (conn != null) {
|
||||
LOGGER.debug("connection create fail: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("connection create fail: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn);
|
||||
}
|
||||
LOGGER.warn("create connection fail " + e.getMessage());
|
||||
totalConnections.decrementAndGet();
|
||||
|
||||
@@ -59,7 +59,8 @@ public class MemoryBufferMonitor {
|
||||
}
|
||||
final BufferPoolRecord record = monitorMap.remove(allocateAddress);
|
||||
if (record != null) {
|
||||
LOGGER.debug("removed buffer record ,address: {}, content:{}", allocateAddress, record);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("removed buffer record ,address: {}, content:{}", allocateAddress, record);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,7 +91,8 @@ public class MemoryBufferMonitor {
|
||||
|
||||
}
|
||||
final BufferPoolRecord record = bufferRecordBuilder.withAllocatedTime(System.currentTimeMillis()).withAllocateSize(allocateSize).build();
|
||||
LOGGER.debug("new buffer record ,address: {}, content:{}", allocateAddress, record);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("new buffer record ,address: {}, content:{}", allocateAddress, record);
|
||||
monitorMap.put(allocateAddress, record);
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -74,7 +74,8 @@ public class SchemaCheckMetaHandler extends AbstractSchemaMetaHandler {
|
||||
AlertUtil.alertSelfResolve(AlarmCode.TABLE_NOT_CONSISTENT_IN_MEMORY, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId),
|
||||
ToResolveContainer.TABLE_NOT_CONSISTENT_IN_MEMORY, tableId);
|
||||
}
|
||||
LOGGER.debug("checking table Table [" + tableMeta.getTableName() + "]");
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("checking table Table [" + tableMeta.getTableName() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -172,7 +172,8 @@ public abstract class AbstractConnection implements Connection {
|
||||
FrontActiveRatioStat.getInstance().remove(this);
|
||||
closeSocket();
|
||||
if (isOnlyFrontTcpConnected() && (reason.contains("Connection reset by peer") || reason.contains("stream closed") || reason.contains("Broken pipe"))) {
|
||||
LOGGER.debug("connection id close for reason [{}] with connection {}", reason, this);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("connection id close for reason [{}] with {}", reason, this);
|
||||
} else {
|
||||
LOGGER.info("connection id close for reason [{}] with connection {}", reason, this);
|
||||
}
|
||||
|
||||
@@ -36,6 +36,8 @@ public class BackendConnection extends PooledConnection {
|
||||
private final int flowLowLevel;
|
||||
private volatile boolean backendWriteFlowControlled;
|
||||
|
||||
private volatile String bindFront;
|
||||
|
||||
public BackendConnection(NetworkChannel channel, SocketWR socketWR, ReadTimeStatusInstance instance, ResponseHandler handler, String schema) {
|
||||
super(channel, socketWR);
|
||||
this.instance = instance;
|
||||
@@ -81,7 +83,8 @@ public class BackendConnection extends PooledConnection {
|
||||
@Override
|
||||
public void stopFlowControl(int currentWritingSize) {
|
||||
if (backendWriteFlowControlled && currentWritingSize <= flowLowLevel) {
|
||||
LOGGER.debug("This connection stop flow control, currentWritingSize= {},the connection info is {}", currentWritingSize, this);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This connection stop flow control, currentWritingSize= {},the connection info is {}", currentWritingSize, this);
|
||||
backendWriteFlowControlled = false;
|
||||
}
|
||||
}
|
||||
@@ -96,7 +99,8 @@ public class BackendConnection extends PooledConnection {
|
||||
|
||||
public void enableRead() {
|
||||
if (frontWriteFlowControlled) {
|
||||
LOGGER.debug("This connection enableRead because of flow control, the connection info is {}", this);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This connection enableRead because of flow control, the connection info is {}", this);
|
||||
socketWR.enableRead();
|
||||
frontWriteFlowControlled = false;
|
||||
}
|
||||
@@ -104,7 +108,8 @@ public class BackendConnection extends PooledConnection {
|
||||
|
||||
public void disableRead() {
|
||||
if (!frontWriteFlowControlled) {
|
||||
LOGGER.debug("This connection disableRead because of flow control, the connection info is {}", this);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This connection disableRead because of flow control, the connection info is {}", this);
|
||||
socketWR.disableRead();
|
||||
frontWriteFlowControlled = true;
|
||||
}
|
||||
@@ -118,6 +123,7 @@ public class BackendConnection extends PooledConnection {
|
||||
} else {
|
||||
service.release();
|
||||
}
|
||||
setBindFront(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -218,6 +224,10 @@ public class BackendConnection extends PooledConnection {
|
||||
this.threadId = threadId;
|
||||
}
|
||||
|
||||
public void setBindFront(String bindFront) {
|
||||
this.bindFront = bindFront;
|
||||
}
|
||||
|
||||
public MySQLResponseService getBackendService() {
|
||||
final AbstractService service = getService();
|
||||
return service instanceof MySQLResponseService ? (MySQLResponseService) service : null;
|
||||
@@ -228,7 +238,12 @@ public class BackendConnection extends PooledConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BackendConnection[id = " + id + " host = " + host + " port = " + port + " localPort = " + localPort + " mysqlId = " + threadId + " db config = " + instance;
|
||||
public String toString() { // show all
|
||||
return "BackendConnection[id = " + id + " host = " + host + " port = " + port + " localPort = " + localPort + " mysqlId = " + threadId + " db config = " + instance + (bindFront != null ? ", currentBindFrontend = " + bindFront : "") + "]";
|
||||
}
|
||||
|
||||
// not show 'currentBindFrontend ='
|
||||
public String toString2() {
|
||||
return "BackendConnection[id = " + id + " host = " + host + " port = " + port + " localPort = " + localPort + " mysqlId = " + threadId + " db config = " + instance + "]";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,4 +400,17 @@ public class FrontendConnection extends AbstractConnection {
|
||||
isSkipCheck() + " isFlowControl = " + isFrontWriteFlowControlled() + " onlyTcpConnect = " +
|
||||
isOnlyFrontTcpConnected() + " ssl = " + (isUseSSL() ? sslName : "no") + "]";
|
||||
}
|
||||
|
||||
public String getSimple() {
|
||||
StringBuilder s = new StringBuilder();
|
||||
s.append("id:");
|
||||
s.append(id);
|
||||
s.append("/");
|
||||
s.append(((FrontendService) getService()).getUser().getFullName());
|
||||
s.append("@");
|
||||
s.append(host);
|
||||
s.append(":");
|
||||
s.append(localPort);
|
||||
return s.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,8 @@ public class FrontendCurrentRunnable implements FrontendRunnable {
|
||||
try {
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
DbleServer.getInstance().getThreadUsedMap().remove(Thread.currentThread().getName());
|
||||
LOGGER.debug("interrupt thread:{},frontNormalTasks:{}", Thread.currentThread().toString(), frontNormalTasks);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("interrupt thread:{},frontNormalTasks:{}", Thread.currentThread().toString(), frontNormalTasks);
|
||||
break;
|
||||
}
|
||||
task = frontNormalTasks.poll();
|
||||
|
||||
@@ -42,7 +42,8 @@ public class WriteToBackendRunnable implements Runnable {
|
||||
while (true) {
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
DbleServer.getInstance().getThreadUsedMap().remove(Thread.currentThread().getName());
|
||||
LOGGER.debug("interrupt thread:{},writeToBackendQueue:{}", Thread.currentThread().toString(), writeToBackendQueue);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("interrupt thread:{},writeToBackendQueue:{}", Thread.currentThread().toString(), writeToBackendQueue);
|
||||
break;
|
||||
}
|
||||
try {
|
||||
@@ -78,7 +79,8 @@ public class WriteToBackendRunnable implements Runnable {
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
DbleServer.getInstance().getThreadUsedMap().remove(Thread.currentThread().getName());
|
||||
LOGGER.debug("interrupt thread:{},concurrentBackQueue:{}", Thread.currentThread().toString(), writeToBackendQueue, e);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("interrupt thread:{},concurrentBackQueue:{}", Thread.currentThread().toString(), writeToBackendQueue, e);
|
||||
break;
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("Unknown error:", t);
|
||||
|
||||
@@ -241,7 +241,8 @@ public final class RW implements Runnable {
|
||||
} catch (Exception e) {
|
||||
//todo 确认调用register的时候会发生什么
|
||||
if ((c.isOnlyFrontTcpConnected() && e instanceof IOException)) {
|
||||
LOGGER.debug("{} register err", c, e);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("{} register err", c, e);
|
||||
} else {
|
||||
LOGGER.warn("{} register err", c, e);
|
||||
}
|
||||
|
||||
@@ -220,8 +220,8 @@ public abstract class AbstractService extends VariablesService implements Servic
|
||||
connection.closeImmediately(Strings.join(closedReasons, ';'));
|
||||
return;
|
||||
} else {
|
||||
|
||||
LOGGER.debug("conn graceful close should delay. {}.", this);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("conn graceful close should delay. {}.", this);
|
||||
task.setDelayedTimes(task.getDelayedTimes() + 1);
|
||||
/*
|
||||
delayed, push back to queue.
|
||||
|
||||
@@ -346,6 +346,7 @@ public final class RouteResultset implements Serializable {
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder s = new StringBuilder();
|
||||
s.append("rrs = [");
|
||||
s.append(srcStatement.length() > 1024 ? srcStatement.substring(0, 1024) + "..." : srcStatement).append(", route={");
|
||||
if (nodes != null) {
|
||||
for (int i = 0; i < nodes.length; ++i) {
|
||||
@@ -353,7 +354,7 @@ public final class RouteResultset implements Serializable {
|
||||
s.append(" -> ").append(nodes[i]);
|
||||
}
|
||||
}
|
||||
s.append("\n}");
|
||||
s.append("\n}]");
|
||||
return s.toString();
|
||||
}
|
||||
|
||||
|
||||
@@ -220,7 +220,8 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(name);
|
||||
StringBuilder sb = new StringBuilder("rrsNode[");
|
||||
sb.append(name);
|
||||
sb.append('-').append(nodeRepeat).append('-');
|
||||
if (null != tableSet && !tableSet.isEmpty()) {
|
||||
sb.append("{" + tableSet.stream().collect(Collectors.joining(",")) + "}." + repeatTableIndex + "-");
|
||||
@@ -229,6 +230,7 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
|
||||
sb.append(statement.length() <= 1024 ? statement : statement.substring(0, 1024) + "...");
|
||||
sb.append("}.");
|
||||
sb.append(multiplexNum.get());
|
||||
sb.append("]");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
@@ -75,7 +75,8 @@ public class RWSplitNonBlockingSession extends Session {
|
||||
if (size <= con.getFlowLowLevel()) {
|
||||
con.enableRead();
|
||||
} else {
|
||||
LOGGER.debug("This front connection want to remove flow control, but mysql conn [{}]'s size [{}] is not lower the FlowLowLevel", con.getThreadId(), size);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This front connection want to remove flow control, but mysql conn [{}]'s size [{}] is not lower the FlowLowLevel", con.getThreadId(), size);
|
||||
}
|
||||
} else {
|
||||
con.enableRead();
|
||||
@@ -305,7 +306,8 @@ public class RWSplitNonBlockingSession extends Session {
|
||||
LOGGER.warn("last conn is remaining, the session is {}, the backend conn is {}", rwSplitService.getConnection(), tmp);
|
||||
tmp.release();
|
||||
}
|
||||
LOGGER.debug("bind conn is {}", bindConn);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("bind conn is {}", bindConn);
|
||||
this.conn = bindConn;
|
||||
}
|
||||
|
||||
@@ -316,7 +318,8 @@ public class RWSplitNonBlockingSession extends Session {
|
||||
if (rwSplitService.isFlowControlled()) {
|
||||
releaseConnectionFromFlowControlled(tmp);
|
||||
}
|
||||
LOGGER.debug("safe unbind conn is {}", tmp);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("safe unbind conn is {}", tmp);
|
||||
tmp.release();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -401,13 +401,13 @@ public class NonBlockingSession extends Session {
|
||||
TraceManager.log(ImmutableMap.of("route-result-set", rrs), traceObject);
|
||||
try {
|
||||
if (killed) {
|
||||
LOGGER.info("{} sql[{}] is killed.", getShardingService().toString2(), getShardingService().getExecuteSql());
|
||||
shardingService.writeErrMessage(ErrorCode.ER_QUERY_INTERRUPTED, "The query is interrupted.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
StringBuilder s = new StringBuilder();
|
||||
LOGGER.debug(s.append(shardingService).append(rrs).toString() + " rrs ");
|
||||
LOGGER.debug("{} print current {}", shardingService.toString2(), rrs);
|
||||
}
|
||||
|
||||
if (PauseShardingNodeManager.getInstance().getIsPausing().get() &&
|
||||
@@ -746,7 +746,6 @@ public class NonBlockingSession extends Session {
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void unLockTable(String sql) {
|
||||
UnLockTablesHandler handler = new UnLockTablesHandler(this, this.shardingService.isAutocommit(), sql);
|
||||
handler.execute();
|
||||
@@ -762,6 +761,9 @@ public class NonBlockingSession extends Session {
|
||||
needWaitFinished) {
|
||||
return;
|
||||
}
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("terminate {}", this);
|
||||
}
|
||||
for (BackendConnection node : target.values()) {
|
||||
node.close("client closed or timeout killed");
|
||||
}
|
||||
@@ -842,6 +844,7 @@ public class NonBlockingSession extends Session {
|
||||
}
|
||||
|
||||
public void bindConnection(RouteResultsetNode key, BackendConnection conn) {
|
||||
conn.setBindFront(this.getSource().getSimple());
|
||||
target.put(key, conn);
|
||||
}
|
||||
|
||||
@@ -961,7 +964,6 @@ public class NonBlockingSession extends Session {
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void rowCountRolling() {
|
||||
rowCountLastSQL = rowCountCurrentSQL;
|
||||
rowCountCurrentSQL = -1;
|
||||
@@ -1060,22 +1062,24 @@ public class NonBlockingSession extends Session {
|
||||
con.enableRead();
|
||||
iterator.remove();
|
||||
} else {
|
||||
LOGGER.debug("This front connection want to remove flow control, but mysql conn [{}]'s size [{}] is not lower the FlowLowLevel", con.getThreadId(), size);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This front connection want to remove flow control, but mysql conn [{}]'s size [{}] is not lower the FlowLowLevel", con.getThreadId(), size);
|
||||
}
|
||||
} else {
|
||||
con.enableRead();
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.debug("This front connection remove flow control, currentWritingSize= {} and now still have {} backend connections in flow control state, the front conn info :{} ", currentWritingSize, flowControlledTarget.size(), this.getSource());
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This front connection remove flow control, currentWritingSize= {} and now still have {} backend connections in flow control state, the front conn info :{} ", currentWritingSize, flowControlledTarget.size(), this.getSource());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startFlowControl(int currentWritingSize) {
|
||||
synchronized (flowControlledTarget) {
|
||||
LOGGER.debug("This front connection begins flow control, currentWritingSize= {},conn info:{}", currentWritingSize, this.getSource());
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This front connection begins flow control, currentWritingSize= {},conn info:{}", currentWritingSize, this.getSource());
|
||||
shardingService.getConnection().setFrontWriteFlowControlled(true);
|
||||
for (BackendConnection con : target.values()) {
|
||||
con.disableRead();
|
||||
@@ -1094,7 +1098,8 @@ public class NonBlockingSession extends Session {
|
||||
con.getSocketWR().enableRead();
|
||||
}
|
||||
if (flowControlledTarget.size() == 0) {
|
||||
LOGGER.debug("This frontend connection remove flow control because of release:{} ", this.getSource());
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This frontend connection remove flow control because of release:{} ", this.getSource());
|
||||
shardingService.getConnection().setFrontWriteFlowControlled(false);
|
||||
}
|
||||
}
|
||||
@@ -1103,9 +1108,10 @@ public class NonBlockingSession extends Session {
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("NonBlockSession with target ");
|
||||
sb.append("NonBlockSession with target = [");
|
||||
for (Map.Entry<RouteResultsetNode, BackendConnection> entry : target.entrySet())
|
||||
sb.append(" rrs = [").append(entry.getKey()).append("] with connection [").append(entry.getValue()).append("]");
|
||||
sb.append(entry.getKey()).append(" with ").append(entry.getValue().toString2()).append(";");
|
||||
sb.append("]");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
@@ -34,14 +34,11 @@ public class ServerQueryHandler implements FrontendQueryHandler {
|
||||
|
||||
@Override
|
||||
public void query(String sql) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(service + (sql.length() > 1024 ? sql.substring(0, 1024) + "..." : sql));
|
||||
}
|
||||
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(service, "handle-query-sql");
|
||||
TraceManager.log(ImmutableMap.of("sql", sql), traceObject);
|
||||
try {
|
||||
if (service.getSession2().isKilled()) {
|
||||
LOGGER.info("sql[" + sql + "] is killed.");
|
||||
LOGGER.info("{} sql[{}] is killed.", service.toString(), sql);
|
||||
service.writeErrMessage(ErrorCode.ER_QUERY_INTERRUPTED, "The query is interrupted.");
|
||||
return;
|
||||
}
|
||||
@@ -58,11 +55,15 @@ public class ServerQueryHandler implements FrontendQueryHandler {
|
||||
String finalSql = sql;
|
||||
StatisticListener.getInstance().record(service.getSession2(), r -> r.onFrontendSetSql(service.getSchema(), finalSql));
|
||||
this.service.setExecuteSql(sql);
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("{} query sql: {}", service.toString3(), (sql.length() > 1024 ? sql.substring(0, 1024) + "..." : sql));
|
||||
}
|
||||
ShardingServerParse serverParse = ServerParseFactory.getShardingParser();
|
||||
int rs = serverParse.parse(sql);
|
||||
boolean isWithHint = serverParse.startWithHint(sql);
|
||||
int sqlType = rs & 0xff;
|
||||
if (isWithHint) {
|
||||
service.controlTx(TransactionOperate.QUERY);
|
||||
if (sqlType == ServerParse.INSERT || sqlType == ServerParse.DELETE || sqlType == ServerParse.UPDATE ||
|
||||
sqlType == ServerParse.DDL) {
|
||||
if (readOnly) {
|
||||
|
||||
@@ -257,7 +257,8 @@ public abstract class BackendService extends AbstractService {
|
||||
if (needCalcReadingData(task) != null) {
|
||||
int currentReadSize = readSize.addAndGet(((NormalServiceTask) task).getOrgData().length);
|
||||
if (currentReadSize > connection.getFlowHighLevel()) {
|
||||
LOGGER.debug("This backend connection begins flow control, currentReadingSize= {},conn info:{}", currentReadSize, connection);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This backend connection begins flow control, currentReadingSize= {},conn info:{}", currentReadSize, connection);
|
||||
connection.disableRead();
|
||||
}
|
||||
}
|
||||
@@ -269,7 +270,8 @@ public abstract class BackendService extends AbstractService {
|
||||
int currentReadSize = readSize.addAndGet(-((NormalServiceTask) task).getOrgData().length);
|
||||
if (currentReadSize <= connection.getFlowLowLevel() &&
|
||||
!businessService.isFlowControlled()) {
|
||||
LOGGER.debug("This backend connection stop flow control, currentReadingSize= {},conn info:{}", currentReadSize, connection);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("This backend connection stop flow control, currentReadingSize= {},conn info:{}", currentReadSize, connection);
|
||||
connection.enableRead();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ public class ManagerQueryHandler {
|
||||
return;
|
||||
}
|
||||
FrontendConnection con = service.getConnection();
|
||||
LOGGER.info("execute manager cmd from {}@{}:{}: {} ", service.getUser(), con.getHost(), con.getLocalPort(), sql);
|
||||
LOGGER.info("execute manager cmd from {}: {} ", con.getSimple(), sql);
|
||||
}
|
||||
switch (sqlType) {
|
||||
case ManagerParse.SELECT:
|
||||
|
||||
@@ -143,7 +143,7 @@ public class ManagerService extends FrontendService<ManagerUserConfig> {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ManagerService [user = " + user + " sql = " + executeSql + " schema = " + schema + " ] With connection " + connection.toString();
|
||||
return "ManagerService [user = " + user + " sql = " + executeSql + " schema = " + schema + " ] with " + connection.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -52,7 +52,8 @@ public final class PauseEnd {
|
||||
service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "No shardingNode paused");
|
||||
return;
|
||||
}
|
||||
LOGGER.debug("{}", pauseInfo);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("{}", pauseInfo);
|
||||
|
||||
if (!PauseShardingNodeManager.getInstance().getDistributeLock()) {
|
||||
service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "other instance is in operation");
|
||||
|
||||
@@ -110,7 +110,7 @@ public final class ShowSession {
|
||||
return null;
|
||||
}
|
||||
for (BackendConnection backCon : backConnections) {
|
||||
sb.append(backCon).append("\r\n");
|
||||
sb.append(backCon.toString2()).append("\r\n");
|
||||
}
|
||||
return createRowDataPacket(FIELD_COUNT, sc.getConnection().getId(), cnCount, sb.toString(), charset);
|
||||
}
|
||||
|
||||
@@ -642,8 +642,8 @@ public class MySQLResponseService extends BackendService {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MySQLResponseService[isExecuting = " + isExecuting + " attachment = " + attachment + " autocommitSynced = " + autocommitSynced + " isolationSynced = " + isolationSynced +
|
||||
" xaStatus = " + xaStatus + " isDDL = " + isDDL + " complexQuery = " + complexQuery + "] with response handler [" + responseHandler + "] with rrs = [" +
|
||||
attachment + "] with connection " + connection.toString();
|
||||
" xaStatus = " + xaStatus + " isDDL = " + isDDL + " complexQuery = " + complexQuery + "] with response handler [" + responseHandler +
|
||||
"] with " + connection.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ public class MySQLShardingSQLHandler {
|
||||
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(service, "route&execute");
|
||||
try {
|
||||
if (service.getSession2().isKilled()) {
|
||||
LOGGER.info("{} sql[{}] is killed.", service.toString2(), service.getExecuteSql());
|
||||
service.writeErrMessage(ErrorCode.ER_QUERY_INTERRUPTED, "The query is interrupted.");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -643,13 +643,26 @@ public class ShardingService extends BusinessService<ShardingUserConfig> {
|
||||
return txInterruptMsg;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
public String toString() { // show all
|
||||
String tmpSql = null;
|
||||
if (executeSql != null) {
|
||||
tmpSql = executeSql.length() > 1024 ? executeSql.substring(0, 1024) + "..." : executeSql;
|
||||
}
|
||||
return "ShardingService[" + user + " schema = " + schema + " executeSql = " + tmpSql +
|
||||
" isInTransaction = " + isInTransaction() + " txId=" + getTxId() + " txInterruptMsg = " + txInterruptMsg +
|
||||
", sessionReadOnly = " + sessionReadOnly + "] with " + connection.toString() + " with " + session;
|
||||
}
|
||||
|
||||
return "ShardingService[ user = " + user + " schema = " + schema + " executeSql = " + tmpSql + " txInterruptMsg = " + txInterruptMsg +
|
||||
" sessionReadOnly = " + sessionReadOnly + "] with connection " + connection.toString() + " with session " + session.toString();
|
||||
// not show 'with session'
|
||||
public String toString2() {
|
||||
return "ShardingService[" + user + ", schema = " + schema +
|
||||
" isInTransaction = " + isInTransaction() + " txId=" + getTxId() + " txInterruptMsg = " + txInterruptMsg +
|
||||
" sessionReadOnly = " + sessionReadOnly + "] with " + connection.toString();
|
||||
}
|
||||
|
||||
// not show 'isInTransaction = txId= '、'with session'
|
||||
public String toString3() {
|
||||
return "ShardingService[" + user + " schema = " + schema + " txInterruptMsg = " + txInterruptMsg +
|
||||
" sessionReadOnly = " + sessionReadOnly + "] with " + connection.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,7 +104,8 @@ public class RWSplitMultiHandler extends RWSplitHandler {
|
||||
We must prevent same buffer called connection.write() twice.
|
||||
According to the above, you need write buffer immediately and set buffer to null.
|
||||
*/
|
||||
LOGGER.debug("Because of multi query had send.It would receive more than one ResultSet. recycle resource should be delayed. client:{}", rwSplitService);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("Because of multi query had send.It would receive more than one ResultSet. recycle resource should be delayed. client:{}", rwSplitService);
|
||||
buffer = frontedConnection.getService().writeToBuffer(eof, buffer);
|
||||
frontedConnection.getService().writeDirectly(buffer, WriteFlags.MULTI_QUERY_PART);
|
||||
buffer = null;
|
||||
|
||||
@@ -5,8 +5,10 @@ import com.actiontech.dble.route.parser.DbleHintParser;
|
||||
import com.actiontech.dble.route.parser.util.ParseUtil;
|
||||
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
|
||||
import com.actiontech.dble.server.parser.RwSplitServerParse;
|
||||
import com.actiontech.dble.singleton.TraceManager;
|
||||
import com.actiontech.dble.statistic.sql.StatisticListener;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
public final class RWSplitMultiQueryHandler extends RWSplitQueryHandler {
|
||||
|
||||
@@ -16,6 +18,8 @@ public final class RWSplitMultiQueryHandler extends RWSplitQueryHandler {
|
||||
|
||||
@Override
|
||||
public void query(String sql) {
|
||||
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(session.getService(), "handle-multi-query-sql");
|
||||
TraceManager.log(ImmutableMap.of("sql", sql), traceObject);
|
||||
try {
|
||||
session.getService().queryCount();
|
||||
if (!session.getService().isMultiStatementAllow()) {
|
||||
@@ -85,6 +89,8 @@ public final class RWSplitMultiQueryHandler extends RWSplitQueryHandler {
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("execute error", e);
|
||||
session.getService().writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, e.getMessage());
|
||||
} finally {
|
||||
TraceManager.finishSpan(traceObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,7 +463,7 @@ public class RWSplitService extends BusinessService<SingleDbGroupUserConfig> {
|
||||
tmpSql = executeSql.length() > 1024 ? executeSql.substring(0, 1024) + "..." : executeSql;
|
||||
}
|
||||
|
||||
return "RWSplitService[ user = " + user + " schema = " + schema + " executeSql = " + tmpSql +
|
||||
" sessionReadOnly = " + sessionReadOnly + "] with connection " + connection.toString() + " with session " + session.toString();
|
||||
return "RWSplitService[" + user + " schema = " + schema + " executeSql = " + tmpSql +
|
||||
" sessionReadOnly = " + sessionReadOnly + "], with " + connection.toString() + ", with" + session.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,8 @@ public final class RoutePenetrationManager {
|
||||
rules.forEach(PenetrationRule::init);
|
||||
}
|
||||
LOGGER.info("init {} route-penetration rules success", rules.size());
|
||||
LOGGER.debug("route-penetration rules :{}", rules);
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("route-penetration rules :{}", rules);
|
||||
} catch (Exception e) {
|
||||
final String msg = "can't parse the route-penetration rule, please check the 'routePenetrationRules', detail exception is :" + e;
|
||||
LOGGER.error(msg);
|
||||
|
||||
Reference in New Issue
Block a user