From e993c1946d76f88f22232d4e38435b508146cdac Mon Sep 17 00:00:00 2001 From: yanhuqing Date: Tue, 28 Nov 2017 11:37:19 +0800 Subject: [PATCH] commit problem #332 --- .../nio/handler/MultiNodeQueryHandler.java | 8 +- .../AbstractCommitNodesHandler.java | 31 +++++++- .../AbstractRollbackNodesHandler.java | 2 +- .../transaction/CommitNodesHandler.java | 2 +- .../normal/NormalCommitNodesHandler.java | 16 ++++ .../xa/XAAutoCommitNodesHandler.java | 37 ++++++++- .../transaction/xa/XACommitNodesHandler.java | 79 ++++++------------- .../xa/XARollbackNodesHandler.java | 45 +++++------ .../dble/server/NonBlockingSession.java | 4 +- 9 files changed, 134 insertions(+), 90 deletions(-) diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java index 3f2d995ff..705184d38 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java @@ -725,10 +725,14 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR } if (session.getXaState() == null) { NormalAutoCommitNodesHandler autoHandler = new NormalAutoCommitNodesHandler(session, data); - autoHandler.commit(); + if (autoHandler.init()) { + autoHandler.commit(); + } } else { XAAutoCommitNodesHandler autoHandler = new XAAutoCommitNodesHandler(session, data, rrs.getNodes()); - autoHandler.commit(); + if (autoHandler.init()) { + autoHandler.commit(); + } } } else { if (session.getXaState() == null) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractCommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractCommitNodesHandler.java index be92adddc..15b4d7171 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractCommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractCommitNodesHandler.java @@ -23,9 +23,9 @@ import java.util.concurrent.locks.ReentrantLock; public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implements CommitNodesHandler { protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractCommitNodesHandler.class); - protected Lock lockForErrorHandle = new ReentrantLock(); - protected Condition sendFinished = lockForErrorHandle.newCondition(); - protected volatile boolean sendFinishedFlag = true; + private Lock lockForErrorHandle = new ReentrantLock(); + private Condition sendFinished = lockForErrorHandle.newCondition(); + private volatile boolean sendFinishedFlag = false; public AbstractCommitNodesHandler(NonBlockingSession session) { super(session); @@ -33,6 +33,18 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem protected abstract boolean executeCommit(MySQLConnection mysqlCon, int position); + protected boolean initResponse() { + boolean isNormal = true; + for (RouteResultsetNode rrn : session.getTargetKeys()) { + BackendConnection conn = session.getTarget(rrn); + conn.setResponseHandler(this); + if (conn.isClosedOrQuit()) { + isNormal = false; + } + } + return isNormal; + } + @Override public void commit() { final int initCount = session.getTargetCount(); @@ -55,7 +67,6 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem sendFinishedFlag = false; for (RouteResultsetNode rrn : session.getTargetKeys()) { final BackendConnection conn = session.getTarget(rrn); - conn.setResponseHandler(this); if (!executeCommit((MySQLConnection) conn, position++)) { break; } @@ -114,4 +125,16 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem public void debugCommitDelay() { } + protected void waitUntilSendFinish() { + this.lockForErrorHandle.lock(); + try { + if (!this.sendFinishedFlag) { + this.sendFinished.await(); + } + } catch (Exception e) { + LOGGER.info("back Response is closed by thread interrupted"); + } finally { + lockForErrorHandle.unlock(); + } + } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractRollbackNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractRollbackNodesHandler.java index dbf006a6e..2b88a9445 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractRollbackNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractRollbackNodesHandler.java @@ -21,7 +21,7 @@ import java.util.concurrent.locks.ReentrantLock; public abstract class AbstractRollbackNodesHandler extends MultiNodeHandler implements RollbackNodesHandler { protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractRollbackNodesHandler.class); - protected volatile boolean sendFinishedFlag = true; + protected volatile boolean sendFinishedFlag = false; protected Lock lockForErrorHandle = new ReentrantLock(); protected Condition sendFinished = lockForErrorHandle.newCondition(); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/CommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/CommitNodesHandler.java index f3be5daa3..160d27400 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/CommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/CommitNodesHandler.java @@ -7,6 +7,6 @@ package com.actiontech.dble.backend.mysql.nio.handler.transaction; public interface CommitNodesHandler { void commit(); - + boolean init(); void clearResources(); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalCommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalCommitNodesHandler.java index 81c8f19ce..644213ae2 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalCommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalCommitNodesHandler.java @@ -14,6 +14,18 @@ import com.actiontech.dble.server.NonBlockingSession; public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { protected byte[] sendData; + @Override + public boolean init() { + if (initResponse()) { + return true; + } else { + String reason = "backend conn closed"; + session.clearResources(true); + createErrPkg(reason).write(session.getSource()); + return false; + } + } + @Override public void clearResources() { sendData = null; @@ -31,6 +43,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { @Override public void okResponse(byte[] ok, BackendConnection conn) { + this.waitUntilSendFinish(); if (decrementCountBy(1)) { if (sendData == null) { sendData = ok; @@ -41,6 +54,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { @Override public void errorResponse(byte[] err, BackendConnection conn) { + this.waitUntilSendFinish(); ErrorPacket errPacket = new ErrorPacket(); errPacket.read(err); String errMsg = new String(errPacket.getMessage()); @@ -53,6 +67,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { @Override public void connectionError(Throwable e, BackendConnection conn) { + this.waitUntilSendFinish(); LOGGER.warn("backend connect", e); this.setFail(e.getMessage()); conn.quit(); @@ -63,6 +78,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { @Override public void connectionClose(BackendConnection conn, String reason) { + this.waitUntilSendFinish(); this.setFail(reason); conn.quit(); if (decrementCountBy(1)) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XAAutoCommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XAAutoCommitNodesHandler.java index ff5d0628a..ef8f0d64f 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XAAutoCommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XAAutoCommitNodesHandler.java @@ -5,9 +5,13 @@ package com.actiontech.dble.backend.mysql.nio.handler.transaction.xa; +import com.actiontech.dble.backend.BackendConnection; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.server.NonBlockingSession; +import java.util.ArrayList; +import java.util.List; + public class XAAutoCommitNodesHandler extends XACommitNodesHandler { private RouteResultsetNode[] nodes; @@ -17,11 +21,40 @@ public class XAAutoCommitNodesHandler extends XACommitNodesHandler { this.nodes = nodes; } + @Override + public boolean init() { + boolean isNormal = true; + List errConnection = null; + for (RouteResultsetNode rrn : session.getTargetKeys()) { + BackendConnection conn = session.getTarget(rrn); + conn.setResponseHandler(this); + if (conn.isClosedOrQuit()) { + if (errConnection == null) { + errConnection = new ArrayList<>(1); + } + errConnection.add(conn); + isNormal = false; + } + } + if (isNormal) { + return true; + } else { + String reason = "backend conn closed"; + sendData = makeErrorPacket(reason); + autoRollback(errConnection); + return false; + } + } + + private void autoRollback(List errConnection) { + XAAutoRollbackNodesHandler autoHandler = new XAAutoRollbackNodesHandler(session, sendData, nodes, errConnection); + autoHandler.rollback(); + } + @Override protected void nextParse() { if (this.isFail()) { - XAAutoRollbackNodesHandler autoHandler = new XAAutoRollbackNodesHandler(session, sendData, nodes, null); - autoHandler.rollback(); + autoRollback(null); } else { commit(); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java index 7cbeba1e1..2bbd41e1d 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java @@ -25,12 +25,26 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { private static final int COMMIT_TIMES = 5; private int tryCommitTimes = 0; private ParticipantLogEntry[] participantLogEntry = null; - protected byte[] sendData = OkPacket.OK; + byte[] sendData = OkPacket.OK; public XACommitNodesHandler(NonBlockingSession session) { super(session); } + @Override + public boolean init() { + if (initResponse()) { + return true; + } else { + String reason = "backend conn closed"; + session.getSource().setTxInterrupt(reason); + sendData = makeErrorPacket(reason); + session.getSource().write(sendData); + LOGGER.warn("init failed:" + reason); + return false; + } + } + @Override public void clearResources() { tryCommitTimes = 0; @@ -41,7 +55,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { @Override protected boolean executeCommit(MySQLConnection mysqlCon, int position) { TxState state = session.getXaState(); - if (state == TxState.TX_STARTED_STATE) { if (participantLogEntry == null) { participantLogEntry = new ParticipantLogEntry[nodeCount]; @@ -65,7 +78,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } else if (state == TxState.TX_PREPARED_STATE) { if (position == 0) { if (!XAStateLog.saveXARecoveryLog(session.getSessionXaID(), TxState.TX_COMMITTING_STATE)) { - String errMsg = "saveXARecoveryLog error, the stage is TX_COMMITING_STATE"; + String errMsg = "saveXARecoveryLog error, the stage is TX_COMMITTING_STATE"; this.setFail(errMsg); sendData = makeErrorPacket(errMsg); nextParse(); @@ -88,7 +101,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { public void run() { ErrorPacket error = new ErrorPacket(); error.setErrNo(ER_ERROR_DURING_COMMIT); - error.setMessage(errorMsg == null ? "unknow error".getBytes() : errorMsg.getBytes()); + error.setMessage(errorMsg == null ? "unknown error".getBytes() : errorMsg.getBytes()); XAAutoRollbackNodesHandler nextHandler = new XAAutoRollbackNodesHandler(session, error.toBytes(), null, null); nextHandler.rollback(); } @@ -98,7 +111,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { return true; } - private byte[] makeErrorPacket(String errMsg) { + byte[] makeErrorPacket(String errMsg) { ErrorPacket errPacket = new ErrorPacket(); errPacket.setErrNo(ErrorCode.ER_UNKNOWN_ERROR); errPacket.setMessage(StringUtil.encode(errMsg, session.getSource().getCharset().getResults())); @@ -197,7 +210,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } nextParse(); } - // 'xa commit' err } else if (mysqlCon.getXaStatus() == TxState.TX_COMMIT_FAILED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE) { //TODO:service degradation? mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE); @@ -217,36 +229,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { String errMsg = new String(StringUtil.encode(e.getMessage(), session.getSource().getCharset().getResults())); this.setFail(errMsg); sendData = makeErrorPacket(errMsg); - if (conn instanceof MySQLConnection) { - MySQLConnection mysqlCon = (MySQLConnection) conn; - if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) { - mysqlCon.quit(); - mysqlCon.setXaStatus(TxState.TX_CONN_QUIT); - XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon); - if (decrementCountBy(1)) { - session.setXaState(TxState.TX_ENDED_STATE); - nextParse(); - } - - // 'xa prepare' connectionError - } else if (mysqlCon.getXaStatus() == TxState.TX_ENDED_STATE) { - mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE); - XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon); - session.setXaState(TxState.TX_PREPARE_UNCONNECT_STATE); - if (decrementCountBy(1)) { - nextParse(); - } - - // 'xa commit' connectionError - } else if (mysqlCon.getXaStatus() == TxState.TX_COMMIT_FAILED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE) { //TODO:service degradation? - mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE); - XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon); - session.setXaState(TxState.TX_COMMIT_FAILED_STATE); - if (decrementCountBy(1)) { - cleanAndFeedback(); - } - } - } + innerConnectError(conn); } @Override @@ -261,10 +244,14 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } - public void connectionCloseLocal(BackendConnection conn, String reason) { + private void connectionCloseLocal(BackendConnection conn, String reason) { this.waitUntilSendFinish(); this.setFail(reason); sendData = makeErrorPacket(reason); + innerConnectError(conn); + } + + private void innerConnectError(BackendConnection conn) { if (conn instanceof MySQLConnection) { MySQLConnection mysqlCon = (MySQLConnection) conn; if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) { @@ -275,7 +262,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { session.setXaState(TxState.TX_ENDED_STATE); nextParse(); } - // 'xa prepare' connectionClose,conn has quit } else if (mysqlCon.getXaStatus() == TxState.TX_ENDED_STATE) { mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE); @@ -284,7 +270,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { if (decrementCountBy(1)) { nextParse(); } - // 'xa commit' connectionClose } else if (mysqlCon.getXaStatus() == TxState.TX_COMMIT_FAILED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE) { //TODO:service degradation? mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE); @@ -319,7 +304,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } session.getSource().write(send); - // partitionly commited,must commit again + // partially committed,must commit again } else if (session.getXaState() == TxState.TX_COMMIT_FAILED_STATE) { MySQLConnection errConn = session.releaseExcept(TxState.TX_COMMIT_FAILED_STATE); if (errConn != null) { @@ -380,18 +365,4 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { } } - - public void waitUntilSendFinish() { - this.lockForErrorHandle.lock(); - try { - if (!this.sendFinishedFlag) { - this.sendFinished.await(); - } - } catch (Exception e) { - LOGGER.info("back Response is closed by thread interrupted"); - } finally { - lockForErrorHandle.unlock(); - } - return; - } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java index c42506c60..79cfaab14 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java @@ -26,7 +26,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { private static final int ROLLBACK_TIMES = 5; private int tryRollbackTimes = 0; private ParticipantLogEntry[] participantLogEntry = null; - protected byte[] sendData = OkPacket.OK; + byte[] sendData = OkPacket.OK; public XARollbackNodesHandler(NonBlockingSession session) { super(session); @@ -165,7 +165,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { session.setXaState(TxState.TX_ENDED_STATE); rollback(); } - // 'xa rollback' ok without prepared } else if (mysqlCon.getXaStatus() == TxState.TX_ENDED_STATE) { mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE); @@ -175,11 +174,11 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { session.setXaState(TxState.TX_INITIALIZE_STATE); cleanAndFeedback(); } - // 'xa rollback' ok - } else if (mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE || mysqlCon.getXaStatus() == TxState.TX_ROLLBACK_FAILED_STATE) { // we dont' konw if the conn prepared or not - - + } else if (mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE || + mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE || + mysqlCon.getXaStatus() == TxState.TX_ROLLBACK_FAILED_STATE) { + // we don't know if the conn prepared or not mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE); XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon); mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE); @@ -189,8 +188,8 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { } cleanAndFeedback(); } - - } else { // LOGGER.error("Wrong XA status flag!"); + } else { + LOGGER.warn("Wrong XA status flag!"); } } } @@ -229,7 +228,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { cleanAndFeedback(); } - // we dont' konw if the conn prepared or not + // we don't know if the conn prepared or not } else if (mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE) { ErrorPacket errPacket = new ErrorPacket(); errPacket.read(err); @@ -251,8 +250,8 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { cleanAndFeedback(); } } - - } else { // LOGGER.error("Wrong XA status flag!"); + } else { + LOGGER.warn("Wrong XA status flag!"); } } } @@ -290,16 +289,15 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { if (decrementCountBy(1)) { cleanAndFeedback(); } - - // we dont' konw if the conn prepared or not + // we don't know if the conn prepared or not } else if (mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE) { session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE); XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon); if (decrementCountBy(1)) { cleanAndFeedback(); } - - } else { // LOGGER.error("Wrong XA status flag!"); + } else { + LOGGER.warn("Wrong XA status flag!"); } } } @@ -318,7 +316,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { session.setXaState(TxState.TX_ENDED_STATE); rollback(); } - // 'xa rollback' ok without prepared } else if (mysqlCon.getXaStatus() == TxState.TX_ENDED_STATE) { mysqlCon.quit(); @@ -328,7 +325,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { session.setXaState(TxState.TX_INITIALIZE_STATE); cleanAndFeedback(); } - // 'xa rollback' err } else if (mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE) { mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE); @@ -338,8 +334,8 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { if (decrementCountBy(1)) { cleanAndFeedback(); } - - } else { // LOGGER.error("Wrong XA status flag!"); + } else { + LOGGER.warn("Wrong XA status flag!"); } } } @@ -355,7 +351,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { } session.getSource().write(send); - //partitionly commited,must commit again + //partially committed,must commit again } else if (session.getXaState() == TxState.TX_ROLLBACK_FAILED_STATE || session.getXaState() == TxState.TX_PREPARED_STATE || session.getXaState() == TxState.TX_PREPARE_UNCONNECT_STATE) { MySQLConnection errConn = session.releaseExcept(session.getXaState()); @@ -364,7 +360,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { if (++tryRollbackTimes < ROLLBACK_TIMES) { rollback(); } else { - StringBuilder closeReason = new StringBuilder("ROLLBCAK FAILED but it will try to ROLLBACK repeatedly in backend until it is success!"); + StringBuilder closeReason = new StringBuilder("ROLLBACK FAILED but it will try to ROLLBACK repeatedly in backend until it is success!"); if (error != null) { closeReason.append(", the ERROR is "); closeReason.append(error); @@ -383,7 +379,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { } } - // rollbak success,but closed coon must remove + // rollback success,but closed coon must remove } else { removeQuitConn(); XAStateLog.saveXARecoveryLog(session.getSessionXaID(), TxState.TX_ROLLBACKED_STATE); @@ -407,7 +403,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { } - public void debugRollbackDelay() { + private void debugRollbackDelay() { try { if (LOGGER.isDebugEnabled()) { String rollbackDelayTime = System.getProperty("ROLLBACK_DELAY"); @@ -421,7 +417,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { } } - public void waitUntilSendFinish() { + private void waitUntilSendFinish() { this.lockForErrorHandle.lock(); try { if (!this.sendFinishedFlag) { @@ -432,7 +428,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler { } finally { lockForErrorHandle.unlock(); } - return; } } diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java index 07f6a1e8d..1466d8fd9 100644 --- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java @@ -317,7 +317,9 @@ public class NonBlockingSession implements Session { } checkBackupStatus(); createCommitNodesHandler(); - commitHandler.commit(); + if (commitHandler.init()) { + commitHandler.commit(); + } } public void checkBackupStatus() {