#1675 before refactor, some bugs

This commit is contained in:
yanhuqing
2020-03-06 10:43:51 +08:00
parent f53586ea75
commit cc070243d6
7 changed files with 73 additions and 11 deletions
@@ -13,11 +13,14 @@ import com.actiontech.dble.server.NonBlockingSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public abstract class AbstractRollbackNodesHandler extends MultiNodeHandler implements RollbackNodesHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractRollbackNodesHandler.class);
protected Set<BackendConnection> closedConnSet;
public AbstractRollbackNodesHandler(NonBlockingSession session) {
super(session);
@@ -56,4 +59,21 @@ public abstract class AbstractRollbackNodesHandler extends MultiNodeHandler impl
}
protected boolean checkClosedConn(BackendConnection conn) {
lock.lock();
try {
if (closedConnSet == null) {
closedConnSet = new HashSet<>(1);
closedConnSet.add(conn);
} else {
if (closedConnSet.contains(conn)) {
return true;
}
closedConnSet.add(conn);
}
return false;
} finally {
lock.unlock();
}
}
}
@@ -109,6 +109,9 @@ public class NormalRollbackNodesHandler extends AbstractRollbackNodesHandler {
@Override
public void connectionClose(BackendConnection conn, String reason) {
if (checkClosedConn(conn)) {
return;
}
// quitted
this.setFail(reason);
RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();
@@ -108,7 +108,13 @@ public final class XACheckHandler {
List<Map<String, String>> xaRows = result.getResult();
for (Map<String, String> row : xaRows) {
String tempXid = row.get("data");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("check xid is " + xid + " tmp xid is " + tempXid);
}
if (tempXid.equalsIgnoreCase(xid.replace("\'", ""))) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("find xid!check xid is " + xid + " tmp xid is " + tempXid);
}
isExistXid = true;
break;
}
@@ -140,7 +140,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
this.debugCommitDelay();
}
preparePhase(mysqlCon);
} else if (state == TxState.TX_PREPARED_STATE) {
if (position == 0) {
@@ -198,6 +197,15 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
private void endPhase(MySQLConnection mysqlCon) {
if (mysqlCon.isClosed()) {
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
}
return;
}
RouteResultsetNode rrn = (RouteResultsetNode) mysqlCon.getAttachment();
String xaTxId = mysqlCon.getConnXID(session, rrn.getMultiplexNum().longValue());
XaDelayProvider.delayBeforeXaEnd(rrn.getName(), xaTxId);
@@ -208,6 +216,15 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
private void preparePhase(MySQLConnection mysqlCon) {
if (mysqlCon.isClosed()) {
mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_PREPARE_UNCONNECT_STATE);
if (decrementToZero(mysqlCon)) {
nextParse();
}
return;
}
RouteResultsetNode rrn = (RouteResultsetNode) mysqlCon.getAttachment();
String xaTxId = mysqlCon.getConnXID(session, rrn.getMultiplexNum().longValue());
XaDelayProvider.delayBeforeXaPrepare(rrn.getName(), xaTxId);
@@ -222,8 +239,9 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
private void commitPhase(MySQLConnection mysqlCon) {
if (session.getXaState() == TxState.TX_COMMIT_FAILED_STATE) {
if (session.getXaState() == TxState.TX_COMMIT_FAILED_STATE || mysqlCon.isClosed()) {
MySQLConnection newConn = session.freshConn(mysqlCon, this);
checkClosedConn(mysqlCon);
if (!newConn.equals(mysqlCon)) {
xaOldThreadIds.putIfAbsent(mysqlCon.getAttachment(), mysqlCon.getThreadId());
mysqlCon = newConn;
@@ -330,6 +348,9 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
if (errPacket.getErrNo() == ErrorCode.ER_XAER_NOTA) {
RouteResultsetNode rrn = (RouteResultsetNode) mysqlCon.getAttachment();
String xid = mysqlCon.getConnXID(session, rrn.getMultiplexNum().longValue());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("check xid is " + xid);
}
XACheckHandler handler = new XACheckHandler(xid, mysqlCon.getSchema(), rrn.getName(), mysqlCon.getPool().getDbPool().getSource());
// if mysql connection holding xa transaction wasn't released, may result in ER_XAER_NOTA.
// so we need check xid here
@@ -389,6 +410,9 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void connectionClose(final BackendConnection conn, final String reason) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("connectionClose " + conn);
}
this.waitUntilSendFinish();
if (checkClosedConn(conn)) {
return;
@@ -478,7 +502,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
if (errConn != null) {
final String xaId = session.getSessionXaID();
XAStateLog.saveXARecoveryLog(xaId, session.getXaState());
if (++tryCommitTimes < COMMIT_TIMES) {
if (DbleServer.getInstance().getConfig().getSystem().getUseSerializableMode() == 1 || ++tryCommitTimes < COMMIT_TIMES) {
// try commit several times
LOGGER.warn("fail to COMMIT xa transaction " + xaId + " at the " + tryCommitTimes + "th time!");
XaDelayProvider.beforeInnerRetry(tryCommitTimes, xaId);
@@ -118,6 +118,9 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
if (position == 0 && participantLogEntry != null) {
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), session.getXaState());
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("executeRollback status:" + session.getXaState());
}
if (session.getXaState() == TxState.TX_STARTED_STATE) {
if (participantLogEntry == null) {
participantLogEntry = new ParticipantLogEntry[participantLogSize];
@@ -129,7 +132,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
}
endPhase(mysqlCon);
} else if (session.getXaState() == TxState.TX_PREPARED_STATE) {
if (position == 0) {
if (!XAStateLog.saveXARecoveryLog(session.getSessionXaID(), TxState.TX_ROLLBACKING_STATE)) {
@@ -139,11 +141,15 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
this.debugRollbackDelay();
}
if (mysqlCon.isClosed()) {
mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE);
}
rollbackPhase(mysqlCon);
} else if (session.getXaState() == TxState.TX_ROLLBACK_FAILED_STATE || session.getXaState() == TxState.TX_PREPARE_UNCONNECT_STATE) {
if (mysqlCon.isClosed()) {
mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE);
}
rollbackPhase(mysqlCon);
} else if (session.getXaState() == TxState.TX_ENDED_STATE) {
if (position == 0) {
this.debugRollbackDelay();
@@ -182,6 +188,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
private void rollbackPhase(MySQLConnection mysqlCon) {
if (mysqlCon.getXaStatus() == TxState.TX_ROLLBACK_FAILED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE) {
MySQLConnection newConn = session.freshConn(mysqlCon, this);
checkClosedConn(mysqlCon);
if (!newConn.equals(mysqlCon)) {
RouteResultsetNode rrn = (RouteResultsetNode) mysqlCon.getAttachment();
xaOldThreadIds.putIfAbsent(rrn, mysqlCon.getThreadId());
@@ -345,6 +352,9 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
@Override
public void connectionError(Throwable e, BackendConnection conn) {
this.waitUntilSendFinish();
if (checkClosedConn(conn)) {
return;
}
if (conn instanceof MySQLConnection) {
boolean finished;
lock.lock();
@@ -462,7 +472,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
session.getSource().write(send);
//partially committed,must commit again
//partially rollbacked, must rollback again
} else if (session.getXaState() == TxState.TX_ROLLBACK_FAILED_STATE || session.getXaState() == TxState.TX_PREPARED_STATE ||
session.getXaState() == TxState.TX_PREPARE_UNCONNECT_STATE) {
if (session.isKilled()) {
@@ -593,7 +593,6 @@ public class ServerConnection extends FrontendConnection {
@Override
public void write(byte[] data) {
SerializableLock.getInstance().unLock(this.id);
markFinished();
super.write(data);
if (session.isDiscard() || session.isKilled()) {
@@ -38,7 +38,7 @@ public final class SerializableLock {
}
this.id.set(frontId);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("locked id " + frontId + ", trace" + printTrace());
LOGGER.debug("locked id " + frontId + " success, trace " + printTrace());
}
}
}
@@ -46,11 +46,11 @@ public final class SerializableLock {
public void unLock(long frontId) {
if (DbleServer.getInstance().getConfig().getSystem().getUseSerializableMode() == 1) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unlock id " + frontId + ", trace" + printTrace());
LOGGER.debug("try unlock id " + frontId + ", trace" + printTrace());
}
if (this.id.get() == frontId) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unlock id " + frontId + ", success");
LOGGER.debug("unlocked id " + frontId + " success, trace " + printTrace());
}
working.set(false);
}