fix xa with virtual node

This commit is contained in:
yanhuqing666
2016-12-18 18:58:50 +08:00
parent ce10aefee6
commit b5559d2358
6 changed files with 67 additions and 63 deletions
+1 -1
View File
@@ -938,13 +938,13 @@ public class MycatServer {
continue;
}
finished =false;
xacmd = xacmd + coordinatorLogEntry.getId() + ';';
outloop: for (SchemaConfig schema : MycatServer.getInstance().getConfig().getSchemas().values()) {
for (TableConfig table : schema.getTables().values()) {
for (String dataNode : table.getDataNodes()) {
PhysicalDBNode dn = MycatServer.getInstance().getConfig().getDataNodes().get(dataNode);
if (participantLogEntry.compareAddress(dn.getDbPool().getSource().getConfig().getIp(), dn.getDbPool().getSource().getConfig().getPort(), dn.getDatabase())) {
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(new String[0], new XARecoverCallback(needCommit?true:false, participantLogEntry));
xacmd = xacmd + coordinatorLogEntry.getId() + "."+dn.getDatabase();
SQLJob sqlJob = new SQLJob(xacmd, dn.getDatabase(), resultHandler, dn.getDbPool().getSource());
sqlJob.run();
LOGGER.debug(String.format("[%s] Host:[%s] schema:[%s]", xacmd, dn.getName(), dn.getDatabase()));
@@ -46,6 +46,7 @@ import io.mycat.net.mysql.HandshakePacket;
import io.mycat.net.mysql.MySQLPacket;
import io.mycat.net.mysql.QuitPacket;
import io.mycat.route.RouteResultsetNode;
import io.mycat.server.NonBlockingSession;
import io.mycat.server.ServerConnection;
import io.mycat.server.parser.ServerParse;
import io.mycat.util.TimeUtil;
@@ -393,13 +394,16 @@ public class MySQLConnection extends BackendAIOConnection {
if (!modifiedSQLExecuted && rrn.isModifySQL()) {
modifiedSQLExecuted = true;
}
String xaTXID = sc.getSession2().getXaTXID();
String xaTXID = getConnXID(sc.getSession2());
if (!sc.isAutocommit() && !sc.isTxstart() && modifiedSQLExecuted) {
sc.setTxstart(true);
}
synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit);
}
public String getConnXID(NonBlockingSession session) {
return session.getSessionXaID() + "." + this.schema;
}
private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
int clientCharSetIndex, int clientTxIsoLation,
boolean expectAutocommit) {
@@ -33,27 +33,27 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_STARTED_STATE:
if (participantLogEntry == null) {
participantLogEntry = new ParticipantLogEntry[nodeCount];
CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getXaTXID(), participantLogEntry, session.getXaState());
XAStateLog.flushMemoryRepository(session.getXaTXID(), coordinatorLogEntry);
CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getSessionXaID(), participantLogEntry, session.getXaState());
XAStateLog.flushMemoryRepository(session.getSessionXaID(), coordinatorLogEntry);
}
XAStateLog.initRecoverylog(session.getXaTXID(), position, mysqlCon);
XAStateLog.initRecoverylog(session.getSessionXaID(), position, mysqlCon);
endPhase(mysqlCon);
break;
case TX_ENDED_STATE:
if (position == 0) {
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_PREPARING_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_PREPARING_STATE);
}
preparePhase(mysqlCon);
break;
case TX_PREPARED_STATE:
if (position == 0) {
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_COMMITING_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_COMMITING_STATE);
}
commitPhase(mysqlCon);
break;
case TX_COMMIT_FAILED_STATE:
if (position == 0) {
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_COMMIT_FAILED_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_COMMIT_FAILED_STATE);
}
commitPhase(mysqlCon);
break;
@@ -61,12 +61,12 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
}
private void endPhase(MySQLConnection mysqlCon) {
String xaTxId = session.getXaTXID();
String xaTxId = mysqlCon.getConnXID(session);
mysqlCon.execCmd("XA END " + xaTxId);
}
private void preparePhase(MySQLConnection mysqlCon) {
String xaTxId = session.getXaTXID();
String xaTxId = mysqlCon.getConnXID(session);
mysqlCon.execCmd("XA PREPARE " + xaTxId);
}
@@ -80,7 +80,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
return;
}
}
String xaTxId = session.getXaTXID();
String xaTxId = mysqlCon.getConnXID(session);
mysqlCon.execCmd("XA COMMIT " + xaTxId);
}
@@ -91,7 +91,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
// END OK
case TX_STARTED_STATE:
mysqlCon.setXaStatus(TxState.TX_ENDED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
@@ -100,7 +100,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
//PREPARE OK
case TX_ENDED_STATE:
mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if(session.getXaState()==TxState.TX_ENDED_STATE){
session.setXaState(TxState.TX_PREPARED_STATE);
@@ -113,7 +113,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_PREPARED_STATE:
// XA reset status now
mysqlCon.setXaStatus(TxState.TX_COMMITED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
if(session.getXaState()==TxState.TX_PREPARED_STATE){
@@ -140,7 +140,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_STARTED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
@@ -150,7 +150,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_ENDED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if(session.getXaState()==TxState.TX_ENDED_STATE){
session.setXaState(TxState.TX_PREPARED_STATE);
@@ -163,7 +163,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_PREPARED_STATE:
//TODO :服务降级?
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_COMMIT_FAILED_STATE);
if (decrementCountBy(1)) {
cleanAndFeedback();
@@ -187,7 +187,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_STARTED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
@@ -196,7 +196,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
// 'xa prepare' connectionError
case TX_ENDED_STATE:
mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_PREPARE_UNCONNECT_STATE);
if (decrementCountBy(1)) {
nextParse();
@@ -207,7 +207,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_PREPARED_STATE:
//TODO :服务降级?
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_COMMIT_FAILED_STATE);
if (decrementCountBy(1)) {
cleanAndFeedback();
@@ -229,7 +229,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_STARTED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
@@ -238,7 +238,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
// 'xa prepare' connectionClose,conn has quit
case TX_ENDED_STATE:
mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_PREPARE_UNCONNECT_STATE);
if (decrementCountBy(1)) {
nextParse();
@@ -249,7 +249,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_PREPARED_STATE:
//TODO :服务降级?
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_COMMIT_FAILED_STATE);
if (decrementCountBy(1)) {
cleanAndFeedback();
@@ -273,7 +273,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
switch (session.getXaState()) {
case TX_INITIALIZE_STATE:
// clear all resources
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_COMMITED_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_COMMITED_STATE);
byte[] send = sendData;
session.clearResources(false);
if (session.closed()) {
@@ -285,7 +285,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
case TX_COMMIT_FAILED_STATE:
MySQLConnection errConn = session.releaseExcept(TxState.TX_COMMIT_FAILED_STATE);
if (errConn != null) {
XAStateLog.saveXARecoverylog(session.getXaTXID(), session.getXaState());
XAStateLog.saveXARecoverylog(session.getSessionXaID(), session.getXaState());
if (++try_commit_times < COMMIT_TIMES) {
// 多试几次
commit();
@@ -295,7 +295,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
MycatServer.getInstance().getXaSessionCheck().addCommitSession(session);
}
} else {
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_COMMITED_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_COMMITED_STATE);
session.setXaState(TxState.TX_INITIALIZE_STATE);
byte[] toSend = sendData;
session.clearResources(false);
@@ -306,7 +306,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
break;
// need to rollback;
default:
XAStateLog.saveXARecoverylog(session.getXaTXID(), session.getXaState());
XAStateLog.saveXARecoverylog(session.getSessionXaID(), session.getXaState());
createErrPkg(error).write(session.getSource());
break;
}
@@ -57,21 +57,21 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
@Override
protected void executeRollback(MySQLConnection mysqlCon, int position) {
if(position==0 && participantLogEntry != null){
XAStateLog.saveXARecoverylog(session.getXaTXID(), session.getXaState());
XAStateLog.saveXARecoverylog(session.getSessionXaID(), session.getXaState());
}
switch (session.getXaState()) {
case TX_STARTED_STATE:
if (participantLogEntry == null) {
participantLogEntry = new ParticipantLogEntry[nodeCount];
CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getXaTXID(), participantLogEntry, session.getXaState());
XAStateLog.flushMemoryRepository(session.getXaTXID(), coordinatorLogEntry);
CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getSessionXaID(), participantLogEntry, session.getXaState());
XAStateLog.flushMemoryRepository(session.getSessionXaID(), coordinatorLogEntry);
}
XAStateLog.initRecoverylog(session.getXaTXID(), position, mysqlCon);
XAStateLog.initRecoverylog(session.getSessionXaID(), position, mysqlCon);
endPhase(mysqlCon);
break;
case TX_PREPARED_STATE:
if(position==0){
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_ROLLBACKING_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_ROLLBACKING_STATE);
}
case TX_ROLLBACK_FAILED_STATE:
case TX_PREPARE_UNCONNECT_STATE:
@@ -84,7 +84,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
private void endPhase(MySQLConnection mysqlCon) {
switch (mysqlCon.getXaStatus()) {
case TX_STARTED_STATE:
String xaTxId = session.getXaTXID();
String xaTxId = mysqlCon.getConnXID(session);
mysqlCon.execCmd("XA END " + xaTxId + ";");
break;
case TX_CONN_QUIT:
@@ -111,7 +111,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
}
case TX_ENDED_STATE:
case TX_PREPARED_STATE:
String xaTxId = session.getXaTXID();
String xaTxId = mysqlCon.getConnXID(session);
mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";");
break;
case TX_CONN_QUIT:
@@ -133,7 +133,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
// 'xa end' ok
case TX_STARTED_STATE:
mysqlCon.setXaStatus(TxState.TX_ENDED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
@@ -142,7 +142,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
// 'xa rollback' ok without prepared
case TX_ENDED_STATE:
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
@@ -155,7 +155,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_PREPARE_UNCONNECT_STATE:
case TX_ROLLBACK_FAILED_STATE:
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
if(session.getXaState()==TxState.TX_PREPARED_STATE){
@@ -179,7 +179,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_STARTED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
@@ -189,7 +189,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_ENDED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
@@ -200,7 +200,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_PREPARED_STATE:
mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE);
mysqlCon.quit();
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
if (decrementCountBy(1)) {
cleanAndFeedback();
@@ -213,7 +213,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
if (errPacket.errno == ErrorCode.ER_XAER_NOTA) {
//ERROR 1397 (XAE04): XAER_NOTA: Unknown XID, not prepared
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
if (session.getXaState() == TxState.TX_PREPARED_STATE) {
@@ -223,7 +223,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
}
} else {
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
cleanAndFeedback();
}
@@ -244,7 +244,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_STARTED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
@@ -253,7 +253,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_ENDED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
@@ -263,7 +263,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_PREPARED_STATE:
mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE);
mysqlCon.quit();
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
if (decrementCountBy(1)) {
cleanAndFeedback();
@@ -272,7 +272,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
// we dont' konw if the conn prepared or not
case TX_PREPARE_UNCONNECT_STATE:
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
cleanAndFeedback();
}
@@ -293,7 +293,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_STARTED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
@@ -302,7 +302,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_ENDED_STATE:
mysqlCon.quit();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
@@ -312,7 +312,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_PREPARED_STATE:
mysqlCon.setXaStatus(TxState.TX_ROLLBACK_FAILED_STATE);
mysqlCon.quit();
XAStateLog.saveXARecoverylog(session.getXaTXID(), mysqlCon);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
if (decrementCountBy(1)) {
cleanAndFeedback();
@@ -329,7 +329,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
// rollbak success
case TX_INITIALIZE_STATE:
// clear all resources
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_ROLLBACKED_STATE);
byte[] send = sendData;
session.clearResources(false);
if (session.closed()) {
@@ -341,7 +341,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
case TX_ROLLBACK_FAILED_STATE:
MySQLConnection errConn = session.releaseExcept(TxState.TX_ROLLBACK_FAILED_STATE);
if (errConn != null) {
XAStateLog.saveXARecoverylog(session.getXaTXID(), session.getXaState());
XAStateLog.saveXARecoverylog(session.getSessionXaID(), session.getXaState());
if (++try_rollback_times < ROLLBACK_TIMES) {
// 多试几次
rollback();
@@ -352,7 +352,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
MycatServer.getInstance().getXaSessionCheck().addRollbackSession(session);
}
} else {
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_ROLLBACKED_STATE);
session.setXaState(TxState.TX_INITIALIZE_STATE);
byte[] toSend = sendData;
session.clearResources(false);
@@ -364,7 +364,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler{
// rollbak success,but closed coon must remove
default:
removeQuitConn();
XAStateLog.saveXARecoverylog(session.getXaTXID(), TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoverylog(session.getSessionXaID(), TxState.TX_ROLLBACKED_STATE);
session.setXaState(TxState.TX_INITIALIZE_STATE);
session.clearResources(false);
if (session.closed()) {
@@ -128,7 +128,7 @@ public class NonBlockingSession implements Session {
"No dataNode found ,please check tables defined in schema:" + source.getSchema());
return;
}
if (this.getXaTXID() != null && this.xaState == TxState.TX_INITIALIZE_STATE) {
if (this.getSessionXaID() != null && this.xaState == TxState.TX_INITIALIZE_STATE) {
this.xaState = TxState.TX_STARTED_STATE;
}
if (nodes.length == 1) {
@@ -162,16 +162,16 @@ public class NonBlockingSession implements Session {
private CommitNodesHandler createCommitNodesHandler() {
if (commitHandler == null) {
if (this.getXaTXID() == null) {
if (this.getSessionXaID() == null) {
commitHandler = new NormalCommitNodesHandler(this);
} else {
commitHandler = new XACommitNodesHandler(this);
}
} else {
if (this.getXaTXID() == null && (commitHandler instanceof XACommitNodesHandler)) {
if (this.getSessionXaID() == null && (commitHandler instanceof XACommitNodesHandler)) {
commitHandler = new NormalCommitNodesHandler(this);
}
if (this.getXaTXID() != null && (commitHandler instanceof NormalCommitNodesHandler)) {
if (this.getSessionXaID() != null && (commitHandler instanceof NormalCommitNodesHandler)) {
commitHandler = new XACommitNodesHandler(this);
}
}
@@ -193,16 +193,16 @@ public class NonBlockingSession implements Session {
private RollbackNodesHandler createRollbackNodesHandler() {
if (rollbackHandler == null) {
if (this.getXaTXID() == null) {
if (this.getSessionXaID() == null) {
rollbackHandler = new NormalRollbackNodesHandler(this);
} else {
rollbackHandler = new XARollbackNodesHandler(this);
}
} else {
if (this.getXaTXID() == null && (rollbackHandler instanceof XARollbackNodesHandler)) {
if (this.getSessionXaID() == null && (rollbackHandler instanceof XARollbackNodesHandler)) {
rollbackHandler = new NormalRollbackNodesHandler(this);
}
if (this.getXaTXID() != null && (rollbackHandler instanceof NormalRollbackNodesHandler)) {
if (this.getSessionXaID() != null && (rollbackHandler instanceof NormalRollbackNodesHandler)) {
rollbackHandler = new XARollbackNodesHandler(this);
}
}
@@ -503,7 +503,7 @@ public class NonBlockingSession implements Session {
}
}
public String getXaTXID() {
public String getSessionXaID() {
return xaTXID;
}
@@ -81,7 +81,7 @@ public final class SetHandler {
break;
}
case XA_FLAG_ON: {
if (c.isTxstart() && c.getSession2().getXaTXID() == null) {
if (c.isTxstart() && c.getSession2().getSessionXaID() == null) {
c.writeErrMessage(ErrorCode.ERR_WRONG_USED, "set xa cmd on can't used before ending a transaction");
return;
}
@@ -90,7 +90,7 @@ public final class SetHandler {
break;
}
case XA_FLAG_OFF: {
if (c.isTxstart() && c.getSession2().getXaTXID() != null) {
if (c.isTxstart() && c.getSession2().getSessionXaID() != null) {
c.writeErrMessage(ErrorCode.ERR_WRONG_USED, "set xa cmd off can't used before ending a transaction");
return;
}