From cc070243d60a4fef6333d05e44aa108bf97ef51a Mon Sep 17 00:00:00 2001 From: yanhuqing Date: Fri, 6 Mar 2020 10:43:51 +0800 Subject: [PATCH] #1675 before refactor, some bugs --- .../AbstractRollbackNodesHandler.java | 20 +++++++++++++ .../normal/NormalRollbackNodesHandler.java | 3 ++ .../transaction/xa/XACheckHandler.java | 6 ++++ .../transaction/xa/XACommitNodesHandler.java | 30 +++++++++++++++++-- .../xa/XARollbackNodesHandler.java | 18 ++++++++--- .../dble/server/ServerConnection.java | 1 - .../dble/singleton/SerializableLock.java | 6 ++-- 7 files changed, 73 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractRollbackNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractRollbackNodesHandler.java index 92650cef6..4be84b167 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractRollbackNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractRollbackNodesHandler.java @@ -13,11 +13,14 @@ import com.actiontech.dble.server.NonBlockingSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; import java.util.List; +import java.util.Set; public abstract class AbstractRollbackNodesHandler extends MultiNodeHandler implements RollbackNodesHandler { protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractRollbackNodesHandler.class); + protected Set closedConnSet; public AbstractRollbackNodesHandler(NonBlockingSession session) { super(session); @@ -56,4 +59,21 @@ public abstract class AbstractRollbackNodesHandler extends MultiNodeHandler impl } + protected boolean checkClosedConn(BackendConnection conn) { + lock.lock(); + try { + if (closedConnSet == null) { + closedConnSet = new HashSet<>(1); + closedConnSet.add(conn); + } else { + if (closedConnSet.contains(conn)) { + return true; + } + closedConnSet.add(conn); + } + return false; + } finally { + lock.unlock(); + } + } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalRollbackNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalRollbackNodesHandler.java index 74b714bd2..86144b12a 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalRollbackNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalRollbackNodesHandler.java @@ -109,6 +109,9 @@ public class NormalRollbackNodesHandler extends AbstractRollbackNodesHandler { @Override public void connectionClose(BackendConnection conn, String reason) { + if (checkClosedConn(conn)) { + return; + } // quitted this.setFail(reason); RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment(); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACheckHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACheckHandler.java index 9e2c7e975..bc2e7984b 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACheckHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACheckHandler.java @@ -108,7 +108,13 @@ public final class XACheckHandler { List> xaRows = result.getResult(); for (Map row : xaRows) { String tempXid = row.get("data"); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("check xid is " + xid + " tmp xid is " + tempXid); + } if (tempXid.equalsIgnoreCase(xid.replace("\'", ""))) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("find xid!check xid is " + xid + " tmp xid is " + tempXid); + } isExistXid = true; break; } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java index a109bf645..60e303bd8 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java @@ -140,7 +140,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } this.debugCommitDelay(); } - preparePhase(mysqlCon); } else if (state == TxState.TX_PREPARED_STATE) { if (position == 0) { @@ -198,6 +197,15 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } private void endPhase(MySQLConnection mysqlCon) { + if (mysqlCon.isClosed()) { + mysqlCon.setXaStatus(TxState.TX_CONN_QUIT); + XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon); + if (decrementToZero(mysqlCon)) { + session.setXaState(TxState.TX_ENDED_STATE); + nextParse(); + } + return; + } RouteResultsetNode rrn = (RouteResultsetNode) mysqlCon.getAttachment(); String xaTxId = mysqlCon.getConnXID(session, rrn.getMultiplexNum().longValue()); XaDelayProvider.delayBeforeXaEnd(rrn.getName(), xaTxId); @@ -208,6 +216,15 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } private void preparePhase(MySQLConnection mysqlCon) { + if (mysqlCon.isClosed()) { + mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE); + XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon); + session.setXaState(TxState.TX_PREPARE_UNCONNECT_STATE); + if (decrementToZero(mysqlCon)) { + nextParse(); + } + return; + } RouteResultsetNode rrn = (RouteResultsetNode) mysqlCon.getAttachment(); String xaTxId = mysqlCon.getConnXID(session, rrn.getMultiplexNum().longValue()); XaDelayProvider.delayBeforeXaPrepare(rrn.getName(), xaTxId); @@ -222,8 +239,9 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } private void commitPhase(MySQLConnection mysqlCon) { - if (session.getXaState() == TxState.TX_COMMIT_FAILED_STATE) { + if (session.getXaState() == TxState.TX_COMMIT_FAILED_STATE || mysqlCon.isClosed()) { MySQLConnection newConn = session.freshConn(mysqlCon, this); + checkClosedConn(mysqlCon); if (!newConn.equals(mysqlCon)) { xaOldThreadIds.putIfAbsent(mysqlCon.getAttachment(), mysqlCon.getThreadId()); mysqlCon = newConn; @@ -330,6 +348,9 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { if (errPacket.getErrNo() == ErrorCode.ER_XAER_NOTA) { RouteResultsetNode rrn = (RouteResultsetNode) mysqlCon.getAttachment(); String xid = mysqlCon.getConnXID(session, rrn.getMultiplexNum().longValue()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("check xid is " + xid); + } XACheckHandler handler = new XACheckHandler(xid, mysqlCon.getSchema(), rrn.getName(), mysqlCon.getPool().getDbPool().getSource()); // if mysql connection holding xa transaction wasn't released, may result in ER_XAER_NOTA. // so we need check xid here @@ -389,6 +410,9 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { @Override public void connectionClose(final BackendConnection conn, final String reason) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("connectionClose " + conn); + } this.waitUntilSendFinish(); if (checkClosedConn(conn)) { return; @@ -478,7 +502,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { if (errConn != null) { final String xaId = session.getSessionXaID(); XAStateLog.saveXARecoveryLog(xaId, session.getXaState()); - if (++tryCommitTimes < COMMIT_TIMES) { + if (DbleServer.getInstance().getConfig().getSystem().getUseSerializableMode() == 1 || ++tryCommitTimes < COMMIT_TIMES) { // try commit several times LOGGER.warn("fail to COMMIT xa transaction " + xaId + " at the " + tryCommitTimes + "th time!"); XaDelayProvider.beforeInnerRetry(tryCommitTimes, xaId); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java index 38be9b76b..190fb1627 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java @@ -118,6 +118,9 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { if (position == 0 && participantLogEntry != null) { XAStateLog.saveXARecoveryLog(session.getSessionXaID(), session.getXaState()); } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("executeRollback status:" + session.getXaState()); + } if (session.getXaState() == TxState.TX_STARTED_STATE) { if (participantLogEntry == null) { participantLogEntry = new ParticipantLogEntry[participantLogSize]; @@ -129,7 +132,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { mysqlCon.setXaStatus(TxState.TX_CONN_QUIT); } endPhase(mysqlCon); - } else if (session.getXaState() == TxState.TX_PREPARED_STATE) { if (position == 0) { if (!XAStateLog.saveXARecoveryLog(session.getSessionXaID(), TxState.TX_ROLLBACKING_STATE)) { @@ -139,11 +141,15 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { } this.debugRollbackDelay(); } + if (mysqlCon.isClosed()) { + mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE); + } rollbackPhase(mysqlCon); - } else if (session.getXaState() == TxState.TX_ROLLBACK_FAILED_STATE || session.getXaState() == TxState.TX_PREPARE_UNCONNECT_STATE) { + if (mysqlCon.isClosed()) { + mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE); + } rollbackPhase(mysqlCon); - } else if (session.getXaState() == TxState.TX_ENDED_STATE) { if (position == 0) { this.debugRollbackDelay(); @@ -182,6 +188,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { private void rollbackPhase(MySQLConnection mysqlCon) { if (mysqlCon.getXaStatus() == TxState.TX_ROLLBACK_FAILED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE) { MySQLConnection newConn = session.freshConn(mysqlCon, this); + checkClosedConn(mysqlCon); if (!newConn.equals(mysqlCon)) { RouteResultsetNode rrn = (RouteResultsetNode) mysqlCon.getAttachment(); xaOldThreadIds.putIfAbsent(rrn, mysqlCon.getThreadId()); @@ -345,6 +352,9 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { @Override public void connectionError(Throwable e, BackendConnection conn) { this.waitUntilSendFinish(); + if (checkClosedConn(conn)) { + return; + } if (conn instanceof MySQLConnection) { boolean finished; lock.lock(); @@ -462,7 +472,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { } session.getSource().write(send); - //partially committed,must commit again + //partially rollbacked, must rollback again } else if (session.getXaState() == TxState.TX_ROLLBACK_FAILED_STATE || session.getXaState() == TxState.TX_PREPARED_STATE || session.getXaState() == TxState.TX_PREPARE_UNCONNECT_STATE) { if (session.isKilled()) { diff --git a/src/main/java/com/actiontech/dble/server/ServerConnection.java b/src/main/java/com/actiontech/dble/server/ServerConnection.java index 8292acc30..f97536af7 100644 --- a/src/main/java/com/actiontech/dble/server/ServerConnection.java +++ b/src/main/java/com/actiontech/dble/server/ServerConnection.java @@ -593,7 +593,6 @@ public class ServerConnection extends FrontendConnection { @Override public void write(byte[] data) { - SerializableLock.getInstance().unLock(this.id); markFinished(); super.write(data); if (session.isDiscard() || session.isKilled()) { diff --git a/src/main/java/com/actiontech/dble/singleton/SerializableLock.java b/src/main/java/com/actiontech/dble/singleton/SerializableLock.java index b1ee614a3..182aa741c 100644 --- a/src/main/java/com/actiontech/dble/singleton/SerializableLock.java +++ b/src/main/java/com/actiontech/dble/singleton/SerializableLock.java @@ -38,7 +38,7 @@ public final class SerializableLock { } this.id.set(frontId); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("locked id " + frontId + ", trace" + printTrace()); + LOGGER.debug("locked id " + frontId + " success, trace " + printTrace()); } } } @@ -46,11 +46,11 @@ public final class SerializableLock { public void unLock(long frontId) { if (DbleServer.getInstance().getConfig().getSystem().getUseSerializableMode() == 1) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("unlock id " + frontId + ", trace" + printTrace()); + LOGGER.debug("try unlock id " + frontId + ", trace" + printTrace()); } if (this.id.get() == frontId) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("unlock id " + frontId + ", success"); + LOGGER.debug("unlocked id " + frontId + " success, trace " + printTrace()); } working.set(false); }