commit problem #332

This commit is contained in:
yanhuqing
2017-11-28 11:37:19 +08:00
parent 40aa4209b7
commit e993c1946d
9 changed files with 134 additions and 90 deletions

View File

@@ -725,10 +725,14 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
if (session.getXaState() == null) {
NormalAutoCommitNodesHandler autoHandler = new NormalAutoCommitNodesHandler(session, data);
autoHandler.commit();
if (autoHandler.init()) {
autoHandler.commit();
}
} else {
XAAutoCommitNodesHandler autoHandler = new XAAutoCommitNodesHandler(session, data, rrs.getNodes());
autoHandler.commit();
if (autoHandler.init()) {
autoHandler.commit();
}
}
} else {
if (session.getXaState() == null) {

View File

@@ -23,9 +23,9 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implements CommitNodesHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractCommitNodesHandler.class);
protected Lock lockForErrorHandle = new ReentrantLock();
protected Condition sendFinished = lockForErrorHandle.newCondition();
protected volatile boolean sendFinishedFlag = true;
private Lock lockForErrorHandle = new ReentrantLock();
private Condition sendFinished = lockForErrorHandle.newCondition();
private volatile boolean sendFinishedFlag = false;
public AbstractCommitNodesHandler(NonBlockingSession session) {
super(session);
@@ -33,6 +33,18 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem
protected abstract boolean executeCommit(MySQLConnection mysqlCon, int position);
protected boolean initResponse() {
boolean isNormal = true;
for (RouteResultsetNode rrn : session.getTargetKeys()) {
BackendConnection conn = session.getTarget(rrn);
conn.setResponseHandler(this);
if (conn.isClosedOrQuit()) {
isNormal = false;
}
}
return isNormal;
}
@Override
public void commit() {
final int initCount = session.getTargetCount();
@@ -55,7 +67,6 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem
sendFinishedFlag = false;
for (RouteResultsetNode rrn : session.getTargetKeys()) {
final BackendConnection conn = session.getTarget(rrn);
conn.setResponseHandler(this);
if (!executeCommit((MySQLConnection) conn, position++)) {
break;
}
@@ -114,4 +125,16 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem
public void debugCommitDelay() {
}
protected void waitUntilSendFinish() {
this.lockForErrorHandle.lock();
try {
if (!this.sendFinishedFlag) {
this.sendFinished.await();
}
} catch (Exception e) {
LOGGER.info("back Response is closed by thread interrupted");
} finally {
lockForErrorHandle.unlock();
}
}
}

View File

@@ -21,7 +21,7 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class AbstractRollbackNodesHandler extends MultiNodeHandler implements RollbackNodesHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractRollbackNodesHandler.class);
protected volatile boolean sendFinishedFlag = true;
protected volatile boolean sendFinishedFlag = false;
protected Lock lockForErrorHandle = new ReentrantLock();
protected Condition sendFinished = lockForErrorHandle.newCondition();

View File

@@ -7,6 +7,6 @@ package com.actiontech.dble.backend.mysql.nio.handler.transaction;
public interface CommitNodesHandler {
void commit();
boolean init();
void clearResources();
}

View File

@@ -14,6 +14,18 @@ import com.actiontech.dble.server.NonBlockingSession;
public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
protected byte[] sendData;
@Override
public boolean init() {
if (initResponse()) {
return true;
} else {
String reason = "backend conn closed";
session.clearResources(true);
createErrPkg(reason).write(session.getSource());
return false;
}
}
@Override
public void clearResources() {
sendData = null;
@@ -31,6 +43,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void okResponse(byte[] ok, BackendConnection conn) {
this.waitUntilSendFinish();
if (decrementCountBy(1)) {
if (sendData == null) {
sendData = ok;
@@ -41,6 +54,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void errorResponse(byte[] err, BackendConnection conn) {
this.waitUntilSendFinish();
ErrorPacket errPacket = new ErrorPacket();
errPacket.read(err);
String errMsg = new String(errPacket.getMessage());
@@ -53,6 +67,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void connectionError(Throwable e, BackendConnection conn) {
this.waitUntilSendFinish();
LOGGER.warn("backend connect", e);
this.setFail(e.getMessage());
conn.quit();
@@ -63,6 +78,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void connectionClose(BackendConnection conn, String reason) {
this.waitUntilSendFinish();
this.setFail(reason);
conn.quit();
if (decrementCountBy(1)) {

View File

@@ -5,9 +5,13 @@
package com.actiontech.dble.backend.mysql.nio.handler.transaction.xa;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import java.util.ArrayList;
import java.util.List;
public class XAAutoCommitNodesHandler extends XACommitNodesHandler {
private RouteResultsetNode[] nodes;
@@ -17,11 +21,40 @@ public class XAAutoCommitNodesHandler extends XACommitNodesHandler {
this.nodes = nodes;
}
@Override
public boolean init() {
boolean isNormal = true;
List<BackendConnection> errConnection = null;
for (RouteResultsetNode rrn : session.getTargetKeys()) {
BackendConnection conn = session.getTarget(rrn);
conn.setResponseHandler(this);
if (conn.isClosedOrQuit()) {
if (errConnection == null) {
errConnection = new ArrayList<>(1);
}
errConnection.add(conn);
isNormal = false;
}
}
if (isNormal) {
return true;
} else {
String reason = "backend conn closed";
sendData = makeErrorPacket(reason);
autoRollback(errConnection);
return false;
}
}
private void autoRollback(List<BackendConnection> errConnection) {
XAAutoRollbackNodesHandler autoHandler = new XAAutoRollbackNodesHandler(session, sendData, nodes, errConnection);
autoHandler.rollback();
}
@Override
protected void nextParse() {
if (this.isFail()) {
XAAutoRollbackNodesHandler autoHandler = new XAAutoRollbackNodesHandler(session, sendData, nodes, null);
autoHandler.rollback();
autoRollback(null);
} else {
commit();
}

View File

@@ -25,12 +25,26 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
private static final int COMMIT_TIMES = 5;
private int tryCommitTimes = 0;
private ParticipantLogEntry[] participantLogEntry = null;
protected byte[] sendData = OkPacket.OK;
byte[] sendData = OkPacket.OK;
public XACommitNodesHandler(NonBlockingSession session) {
super(session);
}
@Override
public boolean init() {
if (initResponse()) {
return true;
} else {
String reason = "backend conn closed";
session.getSource().setTxInterrupt(reason);
sendData = makeErrorPacket(reason);
session.getSource().write(sendData);
LOGGER.warn("init failed:" + reason);
return false;
}
}
@Override
public void clearResources() {
tryCommitTimes = 0;
@@ -41,7 +55,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
@Override
protected boolean executeCommit(MySQLConnection mysqlCon, int position) {
TxState state = session.getXaState();
if (state == TxState.TX_STARTED_STATE) {
if (participantLogEntry == null) {
participantLogEntry = new ParticipantLogEntry[nodeCount];
@@ -65,7 +78,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
} else if (state == TxState.TX_PREPARED_STATE) {
if (position == 0) {
if (!XAStateLog.saveXARecoveryLog(session.getSessionXaID(), TxState.TX_COMMITTING_STATE)) {
String errMsg = "saveXARecoveryLog error, the stage is TX_COMMITING_STATE";
String errMsg = "saveXARecoveryLog error, the stage is TX_COMMITTING_STATE";
this.setFail(errMsg);
sendData = makeErrorPacket(errMsg);
nextParse();
@@ -88,7 +101,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
public void run() {
ErrorPacket error = new ErrorPacket();
error.setErrNo(ER_ERROR_DURING_COMMIT);
error.setMessage(errorMsg == null ? "unknow error".getBytes() : errorMsg.getBytes());
error.setMessage(errorMsg == null ? "unknown error".getBytes() : errorMsg.getBytes());
XAAutoRollbackNodesHandler nextHandler = new XAAutoRollbackNodesHandler(session, error.toBytes(), null, null);
nextHandler.rollback();
}
@@ -98,7 +111,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
return true;
}
private byte[] makeErrorPacket(String errMsg) {
byte[] makeErrorPacket(String errMsg) {
ErrorPacket errPacket = new ErrorPacket();
errPacket.setErrNo(ErrorCode.ER_UNKNOWN_ERROR);
errPacket.setMessage(StringUtil.encode(errMsg, session.getSource().getCharset().getResults()));
@@ -197,7 +210,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
nextParse();
}
// 'xa commit' err
} else if (mysqlCon.getXaStatus() == TxState.TX_COMMIT_FAILED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE) { //TODO:service degradation?
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
@@ -217,36 +229,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
String errMsg = new String(StringUtil.encode(e.getMessage(), session.getSource().getCharset().getResults()));
this.setFail(errMsg);
sendData = makeErrorPacket(errMsg);
if (conn instanceof MySQLConnection) {
MySQLConnection mysqlCon = (MySQLConnection) conn;
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) {
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
}
// 'xa prepare' connectionError
} else if (mysqlCon.getXaStatus() == TxState.TX_ENDED_STATE) {
mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_PREPARE_UNCONNECT_STATE);
if (decrementCountBy(1)) {
nextParse();
}
// 'xa commit' connectionError
} else if (mysqlCon.getXaStatus() == TxState.TX_COMMIT_FAILED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE) { //TODO:service degradation?
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_COMMIT_FAILED_STATE);
if (decrementCountBy(1)) {
cleanAndFeedback();
}
}
}
innerConnectError(conn);
}
@Override
@@ -261,10 +244,14 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
public void connectionCloseLocal(BackendConnection conn, String reason) {
private void connectionCloseLocal(BackendConnection conn, String reason) {
this.waitUntilSendFinish();
this.setFail(reason);
sendData = makeErrorPacket(reason);
innerConnectError(conn);
}
private void innerConnectError(BackendConnection conn) {
if (conn instanceof MySQLConnection) {
MySQLConnection mysqlCon = (MySQLConnection) conn;
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) {
@@ -275,7 +262,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
}
// 'xa prepare' connectionClose,conn has quit
} else if (mysqlCon.getXaStatus() == TxState.TX_ENDED_STATE) {
mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE);
@@ -284,7 +270,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
if (decrementCountBy(1)) {
nextParse();
}
// 'xa commit' connectionClose
} else if (mysqlCon.getXaStatus() == TxState.TX_COMMIT_FAILED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE) { //TODO:service degradation?
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
@@ -319,7 +304,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
session.getSource().write(send);
// partitionly commited,must commit again
// partially committed,must commit again
} else if (session.getXaState() == TxState.TX_COMMIT_FAILED_STATE) {
MySQLConnection errConn = session.releaseExcept(TxState.TX_COMMIT_FAILED_STATE);
if (errConn != null) {
@@ -380,18 +365,4 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
}
public void waitUntilSendFinish() {
this.lockForErrorHandle.lock();
try {
if (!this.sendFinishedFlag) {
this.sendFinished.await();
}
} catch (Exception e) {
LOGGER.info("back Response is closed by thread interrupted");
} finally {
lockForErrorHandle.unlock();
}
return;
}
}

View File

@@ -26,7 +26,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
private static final int ROLLBACK_TIMES = 5;
private int tryRollbackTimes = 0;
private ParticipantLogEntry[] participantLogEntry = null;
protected byte[] sendData = OkPacket.OK;
byte[] sendData = OkPacket.OK;
public XARollbackNodesHandler(NonBlockingSession session) {
super(session);
@@ -165,7 +165,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
}
// 'xa rollback' ok without prepared
} else if (mysqlCon.getXaStatus() == TxState.TX_ENDED_STATE) {
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
@@ -175,11 +174,11 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
}
// 'xa rollback' ok
} else if (mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE || mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE || mysqlCon.getXaStatus() == TxState.TX_ROLLBACK_FAILED_STATE) { // we dont' konw if the conn prepared or not
} else if (mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE ||
mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE ||
mysqlCon.getXaStatus() == TxState.TX_ROLLBACK_FAILED_STATE) {
// we don't know if the conn prepared or not
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
@@ -189,8 +188,8 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
cleanAndFeedback();
}
} else { // LOGGER.error("Wrong XA status flag!");
} else {
LOGGER.warn("Wrong XA status flag!");
}
}
}
@@ -229,7 +228,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
cleanAndFeedback();
}
// we dont' konw if the conn prepared or not
// we don't know if the conn prepared or not
} else if (mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE) {
ErrorPacket errPacket = new ErrorPacket();
errPacket.read(err);
@@ -251,8 +250,8 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
cleanAndFeedback();
}
}
} else { // LOGGER.error("Wrong XA status flag!");
} else {
LOGGER.warn("Wrong XA status flag!");
}
}
}
@@ -290,16 +289,15 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
if (decrementCountBy(1)) {
cleanAndFeedback();
}
// we dont' konw if the conn prepared or not
// we don't know if the conn prepared or not
} else if (mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE) {
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
cleanAndFeedback();
}
} else { // LOGGER.error("Wrong XA status flag!");
} else {
LOGGER.warn("Wrong XA status flag!");
}
}
}
@@ -318,7 +316,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
}
// 'xa rollback' ok without prepared
} else if (mysqlCon.getXaStatus() == TxState.TX_ENDED_STATE) {
mysqlCon.quit();
@@ -328,7 +325,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
}
// 'xa rollback' err
} else if (mysqlCon.getXaStatus() == TxState.TX_PREPARED_STATE) {
mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE);
@@ -338,8 +334,8 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
if (decrementCountBy(1)) {
cleanAndFeedback();
}
} else { // LOGGER.error("Wrong XA status flag!");
} else {
LOGGER.warn("Wrong XA status flag!");
}
}
}
@@ -355,7 +351,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
session.getSource().write(send);
//partitionly commited,must commit again
//partially committed,must commit again
} else if (session.getXaState() == TxState.TX_ROLLBACK_FAILED_STATE || session.getXaState() == TxState.TX_PREPARED_STATE ||
session.getXaState() == TxState.TX_PREPARE_UNCONNECT_STATE) {
MySQLConnection errConn = session.releaseExcept(session.getXaState());
@@ -364,7 +360,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
if (++tryRollbackTimes < ROLLBACK_TIMES) {
rollback();
} else {
StringBuilder closeReason = new StringBuilder("ROLLBCAK FAILED but it will try to ROLLBACK repeatedly in backend until it is success!");
StringBuilder closeReason = new StringBuilder("ROLLBACK FAILED but it will try to ROLLBACK repeatedly in backend until it is success!");
if (error != null) {
closeReason.append(", the ERROR is ");
closeReason.append(error);
@@ -383,7 +379,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
}
// rollbak success,but closed coon must remove
// rollback success,but closed coon must remove
} else {
removeQuitConn();
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), TxState.TX_ROLLBACKED_STATE);
@@ -407,7 +403,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
public void debugRollbackDelay() {
private void debugRollbackDelay() {
try {
if (LOGGER.isDebugEnabled()) {
String rollbackDelayTime = System.getProperty("ROLLBACK_DELAY");
@@ -421,7 +417,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
}
public void waitUntilSendFinish() {
private void waitUntilSendFinish() {
this.lockForErrorHandle.lock();
try {
if (!this.sendFinishedFlag) {
@@ -432,7 +428,6 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
} finally {
lockForErrorHandle.unlock();
}
return;
}
}

View File

@@ -317,7 +317,9 @@ public class NonBlockingSession implements Session {
}
checkBackupStatus();
createCommitNodesHandler();
commitHandler.commit();
if (commitHandler.init()) {
commitHandler.commit();
}
}
public void checkBackupStatus() {