issue_implicit_dts_bug (#1395)

This commit is contained in:
tiger.yan
2019-09-19 00:56:15 -05:00
committed by yanhuqing
parent 015b0c9020
commit b33a608c19
16 changed files with 316 additions and 196 deletions

View File

@@ -1 +0,0 @@
dn1:10;dn2:20;dn3:30;dn4:40

View File

@@ -334,7 +334,7 @@ public abstract class PhysicalDatasource {
}
private void createNewConnection(final ResponseHandler handler, final Object attachment,
final String schema) throws IOException {
final String schema) {
// aysn create connection
DbleServer.getInstance().getComplexQueryExecutor().execute(new Runnable() {
public void run() {

View File

@@ -19,6 +19,7 @@ import com.actiontech.dble.server.parser.ServerParse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
/**
@@ -36,11 +37,12 @@ public class LockTablesHandler extends MultiNodeHandler {
public LockTablesHandler(NonBlockingSession session, RouteResultset rrs) {
super(session);
this.rrs = rrs;
unResponseRrns.addAll(Arrays.asList(rrs.getNodes()));
this.autocommit = session.getSource().isAutocommit();
}
public void execute() throws Exception {
super.reset(this.rrs.getNodes().length);
super.reset();
for (final RouteResultsetNode node : rrs.getNodes()) {
BackendConnection conn = session.getTarget(node);
if (session.tryExistsCon(conn, node)) {
@@ -76,7 +78,8 @@ public class LockTablesHandler extends MultiNodeHandler {
session.releaseConnectionIfSafe(conn, false);
} else {
conn.closeWithoutRsp("unfinished sync");
session.getTargetMap().remove(conn.getAttachment());
RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();
session.getTargetMap().remove(rNode);
}
ErrorPacket errPacket = new ErrorPacket();
errPacket.read(err);
@@ -85,7 +88,7 @@ public class LockTablesHandler extends MultiNodeHandler {
setFail(errMsg);
}
LOGGER.info("error response from " + conn + " err " + errMsg + " code:" + errPacket.getErrNo());
this.tryErrorFinished(this.decrementCountBy(1));
this.tryErrorFinished(this.decrementToZero(conn));
}
@Override
@@ -95,7 +98,7 @@ public class LockTablesHandler extends MultiNodeHandler {
if (clearIfSessionClosed(session)) {
return;
}
boolean isEndPack = decrementCountBy(1);
boolean isEndPack = decrementToZero(conn);
final RouteResultsetNode node = (RouteResultsetNode) conn.getAttachment();
if (node.getSqlType() == ServerParse.UNLOCK) {
session.releaseConnection(conn);

View File

@@ -63,8 +63,9 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
}
protected void reset(int initCount) {
super.reset(initCount);
@Override
protected void reset() {
super.reset();
}
public NonBlockingSession getSession() {
@@ -74,7 +75,7 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
public void execute() throws Exception {
lock.lock();
try {
this.reset(rrs.getNodes().length);
this.reset();
} finally {
lock.unlock();
}
@@ -82,6 +83,7 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave());
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
unResponseRrns.add(node);
if (node.isModifySQL()) {
sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n");
}
@@ -127,7 +129,14 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
errPacket.setMessage(StringUtil.encode(reason, session.getSource().getCharset().getResults()));
err = errPacket;
executeConnError(conn);
lock.lock();
try {
RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();
unResponseRrns.remove(rNode);
executeConnError();
} finally {
lock.unlock();
}
}
private boolean checkClosedConn(BackendConnection conn) {
@@ -151,21 +160,15 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
}
}
private void executeConnError(BackendConnection conn) {
lock.lock();
try {
if (!isFail()) {
setFail(new String(err.getMessage()));
private void executeConnError() {
if (!isFail()) {
setFail(new String(err.getMessage()));
}
if (canResponse() && errorResponse.compareAndSet(false, true)) {
if (releaseDDLLock.compareAndSet(false, true)) {
session.handleSpecial(oriRrs, false);
}
if (--nodeCount <= 0 && errorResponse.compareAndSet(false, true)) {
if (releaseDDLLock.compareAndSet(false, true)) {
session.handleSpecial(oriRrs, false);
}
handleRollbackPacket(err.toBytes());
}
} finally {
lock.unlock();
handleRollbackPacket(err.toBytes());
}
}
@@ -178,7 +181,13 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
errPacket.setMessage(StringUtil.encode(e.toString(), session.getSource().getCharset().getResults()));
err = errPacket;
executeConnError(conn);
lock.lock();
try {
errorConnsCnt++;
executeConnError();
} finally {
lock.unlock();
}
}
@Override
@@ -200,10 +209,10 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
if (!isFail()) {
setFail(new String(errPacket.getMessage()));
}
if (--nodeCount > 0)
return;
session.handleSpecial(oriRrs, false);
handleRollbackPacket(err.toBytes());
if (canResponse()) {
session.handleSpecial(oriRrs, false);
handleRollbackPacket(err.toBytes());
}
} finally {
lock.unlock();
}
@@ -232,7 +241,7 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
lock.lock();
try {
((MySQLConnection) conn).setTesting(false);
if (--nodeCount > 0)
if (!decrementToZero(conn))
return;
if (this.isFail()) {

View File

@@ -8,11 +8,14 @@ package com.actiontech.dble.backend.mysql.nio.handler;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.mysql.ErrorPacket;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@@ -27,6 +30,9 @@ public abstract class MultiNodeHandler implements ResponseHandler {
protected volatile String error;
protected byte packetId;
protected final AtomicBoolean errorResponse = new AtomicBoolean(false);
protected Set<RouteResultsetNode> unResponseRrns = new HashSet<>();
protected int errorConnsCnt = 0;
protected boolean firstResponsed = false;
public MultiNodeHandler(NonBlockingSession session) {
if (session == null) {
@@ -44,12 +50,18 @@ public abstract class MultiNodeHandler implements ResponseHandler {
return isFailed.get();
}
protected int nodeCount;
public void connectionError(Throwable e, BackendConnection conn) {
this.setFail("backend connect: " + e);
LOGGER.info("backend connect", e);
this.tryErrorFinished(decrementCountBy(1));
boolean finished;
lock.lock();
try {
errorConnsCnt++;
finished = canResponse();
} finally {
lock.unlock();
}
this.tryErrorFinished(finished);
}
@@ -84,20 +96,23 @@ public abstract class MultiNodeHandler implements ResponseHandler {
}
protected boolean decrementCountBy(int finished) {
protected boolean decrementToZero(BackendConnection conn) {
boolean zeroReached;
lock.lock();
try {
nodeCount -= finished;
zeroReached = nodeCount == 0;
RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();
unResponseRrns.remove(rNode);
zeroReached = canResponse();
} finally {
lock.unlock();
}
return zeroReached;
}
protected void reset(int initCount) {
nodeCount = initCount;
protected void reset() {
errorConnsCnt = 0;
firstResponsed = false;
unResponseRrns.clear();
isFailed.set(false);
error = null;
packetId = (byte) session.getPacketId().get();
@@ -116,6 +131,17 @@ public abstract class MultiNodeHandler implements ResponseHandler {
return err;
}
protected boolean canResponse() {
if (firstResponsed) {
return false;
}
if (unResponseRrns.size() == errorConnsCnt) {
firstResponsed = true;
return true;
}
return false;
}
protected void tryErrorFinished(boolean allEnd) {
if (allEnd && !session.closed()) {
// clear session resources,release all
@@ -140,21 +166,10 @@ public abstract class MultiNodeHandler implements ResponseHandler {
public void connectionClose(BackendConnection conn, String reason) {
this.setFail("closed connection:" + reason + " con:" + conn);
boolean finished;
lock.lock();
try {
finished = (this.nodeCount == 0);
} finally {
lock.unlock();
}
if (!finished) {
finished = this.decrementCountBy(1);
}
if (error == null) {
error = "back connection closed ";
}
tryErrorFinished(finished);
tryErrorFinished(decrementToZero(conn));
}
public void clearResources() {

View File

@@ -34,6 +34,9 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* @author mycat
@@ -58,6 +61,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
private volatile ByteBuffer byteBuffer;
private Set<BackendConnection> closedConnSet;
private final boolean modifiedSQL;
protected Set<RouteResultsetNode> connRrns = new ConcurrentSkipListSet<>();
private Map<String, Integer> dataNodePauseInfo; // only for debug
public MultiNodeQueryHandler(RouteResultset rrs, NonBlockingSession session) {
@@ -77,11 +81,13 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
initDebugInfo();
}
protected void reset(int initCount) {
super.reset(initCount);
@Override
protected void reset() {
super.reset();
if (rrs.isLoadData()) {
packetId = session.getSource().getLoadDataInfileHandler().getLastPackId();
}
connRrns.clear();
this.netOutBytes = 0;
this.resultSize = 0;
}
@@ -93,7 +99,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
public void execute() throws Exception {
lock.lock();
try {
this.reset(rrs.getNodes().length);
this.reset();
this.fieldsReturned = false;
this.affectedRows = 0L;
this.insertId = 0L;
@@ -103,6 +109,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave());
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
unResponseRrns.add(node);
if (node.isModifySQL()) {
sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n");
}
@@ -116,6 +123,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
node.setRunOnSlave(rrs.getRunOnSlave());
innerExecute(conn, node);
} else {
connRrns.add(node);
// create new connection
node.setRunOnSlave(rrs.getRunOnSlave());
PhysicalDBNode dn = DbleServer.getInstance().getConfig().getDataNodes().get(node.getName());
@@ -150,7 +158,14 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
errPacket.setMessage(StringUtil.encode(reason, session.getSource().getCharset().getResults()));
err = errPacket;
session.resetMultiStatementStatus();
executeError(conn);
lock.lock();
try {
RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();
unResponseRrns.remove(rNode);
executeError(conn);
} finally {
lock.unlock();
}
}
@Override
@@ -164,13 +179,20 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
errPacket.setMessage(StringUtil.encode(errMsg, session.getSource().getCharset().getResults()));
err = errPacket;
session.resetMultiStatementStatus();
executeError(conn);
lock.lock();
try {
errorConnsCnt++;
executeError(conn);
} finally {
lock.unlock();
}
}
@Override
public void connectionAcquired(final BackendConnection conn) {
final RouteResultsetNode node = (RouteResultsetNode) conn.getAttachment();
session.bindConnection(node, conn);
connRrns.remove(node);
innerExecute(conn, node);
}
@@ -192,7 +214,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
errConnection = new ArrayList<>();
}
errConnection.add(conn);
if (--nodeCount == 0) {
if (decrementToZero(conn)) {
session.handleSpecial(rrs, false, getDDLErrorInfo());
packetId++;
if (byteBuffer != null) {
@@ -228,10 +250,9 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
affectedRows = ok.getAffectedRows();
}
if (ok.getInsertId() > 0) {
insertId = (insertId == 0) ? ok.getInsertId() : Math.min(
insertId, ok.getInsertId());
insertId = (insertId == 0) ? ok.getInsertId() : Math.min(insertId, ok.getInsertId());
}
if (--nodeCount > 0)
if (!decrementToZero(conn))
return;
if (isFail()) {
session.handleSpecial(rrs, false);
@@ -318,6 +339,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
if (errorResponse.get()) {
return;
}
RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();
final ServerConnection source = session.getSource();
if (!rrs.isCallStatement()) {
if (clearIfSessionClosed(session)) {
@@ -326,8 +348,15 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
session.releaseConnectionIfSafe(conn, false);
}
}
if (decrementCountBy(1)) {
boolean zeroReached;
lock.lock();
try {
unResponseRrns.remove(rNode);
zeroReached = canResponse();
} finally {
lock.unlock();
}
if (zeroReached) {
this.resultSize += eof.length;
if (!rrs.isCallStatement()) {
if (this.sessionAutocommit && !session.getSource().isTxStart() && !session.getSource().isLocked()) { // clear all connections
@@ -444,27 +473,22 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
private void executeError(BackendConnection conn) {
lock.lock();
try {
if (!isFail()) {
setFail(new String(err.getMessage()));
if (!isFail()) {
setFail(new String(err.getMessage()));
}
if (errConnection == null) {
errConnection = new ArrayList<>();
}
errConnection.add(conn);
if (canResponse()) {
session.handleSpecial(rrs, false);
packetId++;
if (byteBuffer == null) {
handleEndPacket(err.toBytes(), AutoTxOperation.ROLLBACK, conn, false);
} else {
session.getSource().write(byteBuffer);
handleEndPacket(err.toBytes(), AutoTxOperation.ROLLBACK, conn, false);
}
if (errConnection == null) {
errConnection = new ArrayList<>();
}
errConnection.add(conn);
if (--nodeCount == 0) {
session.handleSpecial(rrs, false);
packetId++;
if (byteBuffer == null) {
handleEndPacket(err.toBytes(), AutoTxOperation.ROLLBACK, conn, false);
} else {
session.getSource().write(byteBuffer);
handleEndPacket(err.toBytes(), AutoTxOperation.ROLLBACK, conn, false);
}
}
} finally {
lock.unlock();
}
}
@@ -576,9 +600,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
void handleEndPacket(byte[] data, AutoTxOperation txOperation, BackendConnection conn, boolean isSuccess) {
ServerConnection source = session.getSource();
if (source.isAutocommit() && !source.isTxStart() && this.modifiedSQL) {
if (nodeCount < 0) {
return;
}
//Implicit Distributed Transaction,send commit or rollback automatically
if (txOperation == AutoTxOperation.COMMIT) {
if (!conn.isDDL()) {
@@ -609,19 +630,21 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
}
if (nodeCount == 0) {
// Explicit Distributed Transaction
if (inTransaction && (AutoTxOperation.ROLLBACK == txOperation)) {
source.setTxInterrupt("ROLLBACK");
}
session.setResponseTime(isSuccess);
session.getSource().write(data);
session.multiStatementNextSql(session.getIsMultiStatement().get());
// Explicit Distributed Transaction
if (inTransaction && (AutoTxOperation.ROLLBACK == txOperation)) {
source.setTxInterrupt("ROLLBACK");
}
session.setResponseTime(isSuccess);
session.getSource().write(data);
session.multiStatementNextSql(session.getIsMultiStatement().get());
}
}
public void waitAllConnConnectorError() {
while (connRrns.size() - 1 != errorConnsCnt) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
}
}
private void initDebugInfo() {
if (LOGGER.isDebugEnabled()) {

View File

@@ -63,20 +63,18 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
lock.lock();
try {
if (isFail()) {
if (--nodeCount > 0) {
return;
if (decrementToZero(conn)) {
session.resetMultiStatementStatus();
handleEndPacket(err.toBytes(), AutoTxOperation.ROLLBACK, conn, false);
}
session.resetMultiStatementStatus();
handleEndPacket(err.toBytes(), AutoTxOperation.ROLLBACK, conn, false);
} else {
if (!fieldsReturned) {
fieldsReturned = true;
mergeFieldEof(fields, conn);
}
if (--nodeCount > 0) {
return;
if (decrementToZero(conn)) {
startOwnThread();
}
startOwnThread();
}
} catch (Exception e) {
handleDataProcessException(e);

View File

@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author mycat
@@ -54,6 +55,7 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
private int fieldCount;
private List<FieldPacket> fieldPackets = new ArrayList<>();
private volatile boolean connClosed = false;
private AtomicBoolean writeToClient = new AtomicBoolean(false);
public SingleNodeHandler(RouteResultset rrs, NonBlockingSession session) {
this.rrs = rrs;
@@ -164,14 +166,15 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
source.setTxInterrupt(errMsg);
session.handleSpecial(rrs, false);
if (buffer != null) {
/* SELECT 9223372036854775807 + 1; response: field_count, field, eof, err */
buffer = source.writeToBuffer(errPkg.toBytes(), allocBuffer());
session.setResponseTime(false);
source.write(buffer);
} else {
errPkg.write(source);
if (writeToClient.compareAndSet(false, true)) {
if (buffer != null) {
/* SELECT 9223372036854775807 + 1; response: field_count, field, eof, err */
buffer = source.writeToBuffer(errPkg.toBytes(), allocBuffer());
session.setResponseTime(false);
source.write(buffer);
} else {
errPkg.write(source);
}
}
}
@@ -215,7 +218,9 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
session.multiStatementPacket(ok, packetId);
boolean multiStatementFlag = session.getIsMultiStatement().get();
doSqlStat();
ok.write(source);
if (rrs.isCallStatement() || writeToClient.compareAndSet(false, true)) {
ok.write(source);
}
session.multiStatementNextSql(multiStatementFlag);
}
}
@@ -233,7 +238,9 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
session.multiStatementPacket(errPacket, packetId);
boolean multiStatementFlag = session.getIsMultiStatement().get();
doSqlStat();
errPacket.write(session.getSource());
if (writeToClient.compareAndSet(false, true)) {
errPacket.write(session.getSource());
}
session.multiStatementNextSql(multiStatementFlag);
}
@@ -259,7 +266,9 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
session.setResponseTime(true);
boolean multiStatementFlag = session.getIsMultiStatement().get();
doSqlStat();
source.write(buffer);
if (writeToClient.compareAndSet(false, true)) {
source.write(buffer);
}
session.multiStatementNextSql(multiStatementFlag);
}

View File

@@ -16,6 +16,7 @@ import com.actiontech.dble.server.parser.ServerParse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,7 +40,7 @@ public class UnLockTablesHandler extends MultiNodeHandler implements ResponseHan
public void execute() {
Map<RouteResultsetNode, BackendConnection> lockedCons = session.getTargetMap();
this.reset(lockedCons.size());
this.reset();
// if client just send an unlock tables, theres is no lock tables statement, just send back OK
if (lockedCons.size() == 0) {
LOGGER.info("find no locked backend connection!" + session.getSource());
@@ -50,10 +51,17 @@ public class UnLockTablesHandler extends MultiNodeHandler implements ResponseHan
ok.write(session.getSource());
return;
}
Map<RouteResultsetNode, BackendConnection> forUnlocks = new HashMap<>(lockedCons.size());
for (Map.Entry<RouteResultsetNode, BackendConnection> entry : lockedCons.entrySet()) {
RouteResultsetNode dataNode = entry.getKey();
BackendConnection conn = entry.getValue();
RouteResultsetNode node = new RouteResultsetNode(dataNode.getName(), ServerParse.UNLOCK, srcStatement);
BackendConnection conn = lockedCons.get(dataNode);
forUnlocks.put(node, conn);
unResponseRrns.add(node);
}
for (Map.Entry<RouteResultsetNode, BackendConnection> entry : forUnlocks.entrySet()) {
RouteResultsetNode node = entry.getKey();
BackendConnection conn = entry.getValue();
if (clearIfSessionClosed(session)) {
return;
}
@@ -94,14 +102,14 @@ public class UnLockTablesHandler extends MultiNodeHandler implements ResponseHan
}
LOGGER.info("error response from " + conn + " err " + errMsg + " code:" + errPacket.getErrNo());
this.tryErrorFinished(this.decrementCountBy(1));
this.tryErrorFinished(this.decrementToZero(conn));
}
@Override
public void okResponse(byte[] data, BackendConnection conn) {
boolean executeResponse = conn.syncAndExecute();
if (executeResponse) {
boolean isEndPack = decrementCountBy(1);
boolean isEndPack = decrementToZero(conn);
session.releaseConnection(conn);
if (isEndPack) {
if (this.isFail() || session.closed()) {

View File

@@ -61,6 +61,14 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem
public void writeQueueAvailable() {
}
@Override
public void reset() {
errorConnsCnt = 0;
firstResponsed = false;
unResponseRrns.clear();
packetId = 0;
}
public void debugCommitDelay() {
}

View File

@@ -18,23 +18,21 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void commit() {
final int initCount = session.getTargetCount();
if (initCount <= 0) {
if (session.getTargetCount() <= 0) {
if (implictCommitHandler == null && sendData == null) {
sendData = session.getOkByteArray();
}
cleanAndFeedback();
return;
}
lock.lock();
try {
reset(initCount);
reset();
} finally {
lock.unlock();
}
int position = 0;
unResponseRrns.addAll(session.getTargetKeys());
for (RouteResultsetNode rrn : session.getTargetKeys()) {
final BackendConnection conn = session.getTarget(rrn);
conn.setResponseHandler(this);
@@ -65,7 +63,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void okResponse(byte[] ok, BackendConnection conn) {
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
if (implictCommitHandler == null && sendData == null) {
sendData = session.getOkByteArray();
}
@@ -80,17 +78,25 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
String errMsg = new String(errPacket.getMessage());
this.setFail(errMsg);
conn.close("commit response error");
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
cleanAndFeedback();
}
}
// should be not happen
@Override
public void connectionError(Throwable e, BackendConnection conn) {
LOGGER.info("backend connect", e);
this.setFail(e.getMessage());
conn.close("Commit connection Error");
if (decrementCountBy(1)) {
boolean finished;
lock.lock();
try {
errorConnsCnt++;
finished = canResponse();
} finally {
lock.unlock();
}
if (finished) {
cleanAndFeedback();
}
}
@@ -102,7 +108,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
}
this.setFail(reason);
conn.close("commit connection closed");
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
cleanAndFeedback();
}
}

View File

@@ -30,28 +30,26 @@ public class NormalRollbackNodesHandler extends AbstractRollbackNodesHandler {
}
public void rollback() {
final int initCount = session.getTargetCount();
lock.lock();
try {
reset(initCount);
reset();
} finally {
lock.unlock();
}
int position = 0;
for (final RouteResultsetNode node : session.getTargetKeys()) {
final BackendConnection conn = session.getTarget(node);
if (conn.isClosed()) {
lock.lock();
try {
nodeCount--;
} finally {
lock.unlock();
}
continue;
if (!conn.isClosed()) {
unResponseRrns.add(node);
}
}
for (final RouteResultsetNode node : session.getTargetKeys()) {
final BackendConnection conn = session.getTarget(node);
if (!conn.isClosed()) {
position++;
conn.setResponseHandler(this);
conn.rollback();
}
position++;
conn.setResponseHandler(this);
conn.rollback();
}
if (position == 0) {
if (sendData == null) {
@@ -63,7 +61,7 @@ public class NormalRollbackNodesHandler extends AbstractRollbackNodesHandler {
@Override
public void okResponse(byte[] ok, BackendConnection conn) {
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
if (sendData == null) {
sendData = session.getOkByteArray();
}
@@ -78,18 +76,27 @@ public class NormalRollbackNodesHandler extends AbstractRollbackNodesHandler {
String errMsg = new String(errPacket.getMessage());
this.setFail(errMsg);
conn.close("rollback error response"); //quit to rollback
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
cleanAndFeedback();
}
}
// should be not happen
@Override
public void connectionError(Throwable e, BackendConnection conn) {
LOGGER.info("backend connect", e);
String errMsg = new String(StringUtil.encode(e.getMessage(), session.getSource().getCharset().getResults()));
this.setFail(errMsg);
conn.close("rollback connection error"); //quit if not rollback
if (decrementCountBy(1)) {
boolean finished;
lock.lock();
try {
errorConnsCnt++;
finished = canResponse();
} finally {
lock.unlock();
}
if (finished) {
cleanAndFeedback();
}
}
@@ -98,7 +105,7 @@ public class NormalRollbackNodesHandler extends AbstractRollbackNodesHandler {
public void connectionClose(BackendConnection conn, String reason) {
// quitted
this.setFail(reason);
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
cleanAndFeedback();
}
}

View File

@@ -57,16 +57,15 @@ public class SavePointHandler extends MultiNodeHandler {
return;
}
final int initCount = session.getTargetCount();
lock.lock();
try {
reset(initCount);
reset();
} finally {
lock.unlock();
}
SavePoint latestSp = savepoints.getPrev();
if (latestSp == null || initCount > latestSp.getRouteNodes().size()) {
if (latestSp == null || session.getTargetCount() > latestSp.getRouteNodes().size()) {
newSp.setRouteNodes(new HashSet<>(session.getTargetKeys()));
} else {
newSp.setRouteNodes(latestSp.getRouteNodes());
@@ -93,10 +92,9 @@ public class SavePointHandler extends MultiNodeHandler {
return;
}
final int initCount = session.getTargetCount();
lock.lock();
try {
reset(initCount);
reset();
} finally {
lock.unlock();
}
@@ -166,7 +164,7 @@ public class SavePointHandler extends MultiNodeHandler {
@Override
public void okResponse(byte[] ok, BackendConnection conn) {
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
cleanAndFeedback();
}
}
@@ -178,7 +176,7 @@ public class SavePointHandler extends MultiNodeHandler {
String errMsg = new String(errPacket.getMessage());
LOGGER.warn("get error package, content is:" + errMsg);
this.setFail(errMsg);
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
cleanAndFeedback();
}
}
@@ -189,7 +187,7 @@ public class SavePointHandler extends MultiNodeHandler {
String errMsg = "Connection {DataHost[" + conn.getHost() + ":" + conn.getPort() + "],Schema[" + conn.getSchema() + "],threadID[" +
((MySQLConnection) conn).getThreadId() + "]} was closed ,reason is [" + reason + "]";
this.setFail(errMsg);
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
cleanAndFeedback();
}
}
@@ -199,7 +197,7 @@ public class SavePointHandler extends MultiNodeHandler {
LOGGER.warn("backend connect err:", e);
this.setFail(e.getMessage());
conn.close("savepoint connection Error");
if (decrementCountBy(1)) {
if (decrementToZero(conn)) {
cleanAndFeedback();
}
}

View File

@@ -35,6 +35,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
private int tryCommitTimes = 0;
private int backgroundCommitTimes = 0;
private ParticipantLogEntry[] participantLogEntry = null;
private int participantLogSize = 0;
byte[] sendData = OkPacket.OK;
private Lock lockForErrorHandle = new ReentrantLock();
@@ -49,16 +50,15 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void commit() {
final int initCount = session.getTargetCount();
if (initCount <= 0) {
participantLogSize = session.getTargetCount();
if (participantLogSize <= 0) {
session.getSource().write(session.getOkByteArray());
session.multiStatementNextSql(session.getIsMultiStatement().get());
return;
}
lock.lock();
try {
reset(initCount);
reset();
} finally {
lock.unlock();
}
@@ -73,6 +73,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
try {
sendFinishedFlag = false;
unResponseRrns.addAll(session.getTargetKeys());
for (RouteResultsetNode rrn : session.getTargetKeys()) {
final BackendConnection conn = session.getTarget(rrn);
conn.setResponseHandler(this);
@@ -110,7 +111,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
TxState state = session.getXaState();
if (state == TxState.TX_STARTED_STATE) {
if (participantLogEntry == null) {
participantLogEntry = new ParticipantLogEntry[nodeCount];
participantLogEntry = new ParticipantLogEntry[participantLogSize];
CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getSessionXaID(), participantLogEntry, session.getXaState());
XAStateLog.flushMemoryRepository(session.getSessionXaID(), coordinatorLogEntry);
}
@@ -149,7 +150,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
commitPhase(mysqlCon);
} else if (state == TxState.TX_PREPARE_UNCONNECT_STATE) {
LOGGER.warn("commit error and rollback the xa");
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
DbleServer.getInstance().getComplexQueryExecutor().execute(new Runnable() {
@Override
public void run() {
@@ -193,7 +194,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
if (!newConn.equals(mysqlCon)) {
xaOldThreadIds.putIfAbsent(mysqlCon.getAttachment(), mysqlCon.getThreadId());
mysqlCon = newConn;
} else if (decrementCountBy(1)) {
} else if (decrementToZero(mysqlCon)) {
cleanAndFeedback(false);
return;
}
@@ -212,7 +213,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
if (state == TxState.TX_STARTED_STATE) {
mysqlCon.setXaStatus(TxState.TX_ENDED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
}
@@ -220,7 +221,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
//PREPARE OK
mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
if (session.getXaState() == TxState.TX_ENDED_STATE) {
session.setXaState(TxState.TX_PREPARED_STATE);
}
@@ -232,7 +233,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
mysqlCon.setXaStatus(TxState.TX_COMMITTED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
if (session.getXaState() == TxState.TX_PREPARED_STATE) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
}
@@ -255,7 +256,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
}
@@ -265,7 +266,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
if (session.getXaState() == TxState.TX_ENDED_STATE) {
session.setXaState(TxState.TX_PREPARED_STATE);
}
@@ -276,7 +277,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_COMMIT_FAILED_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
cleanAndFeedback(false);
}
} else if (mysqlCon.getXaStatus() == TxState.TX_COMMIT_FAILED_STATE) {
@@ -292,7 +293,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
mysqlCon.setXaStatus(TxState.TX_COMMITTED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
if (session.getXaState() == TxState.TX_PREPARED_STATE) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
}
@@ -305,7 +306,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_COMMIT_FAILED_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
cleanAndFeedback(false);
}
}
@@ -313,7 +314,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_COMMIT_FAILED_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
cleanAndFeedback(false);
}
}
@@ -328,7 +329,15 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
String errMsg = new String(StringUtil.encode(e.getMessage(), session.getSource().getCharset().getResults()));
this.setFail(errMsg);
sendData = makeErrorPacket(errMsg);
innerConnectError(conn);
boolean finished;
lock.lock();
try {
errorConnsCnt++;
finished = canResponse();
} finally {
lock.unlock();
}
innerConnectError(conn, finished);
}
@Override
@@ -339,17 +348,17 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
this.setFail(reason);
sendData = makeErrorPacket(reason);
innerConnectError(conn);
innerConnectError(conn, decrementToZero(conn));
}
private void innerConnectError(BackendConnection conn) {
private void innerConnectError(BackendConnection conn, boolean finished) {
if (conn instanceof MySQLConnection) {
MySQLConnection mysqlCon = (MySQLConnection) conn;
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_CONN_QUIT);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (finished) {
session.setXaState(TxState.TX_ENDED_STATE);
nextParse();
}
@@ -358,7 +367,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
mysqlCon.setXaStatus(TxState.TX_PREPARE_UNCONNECT_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_PREPARE_UNCONNECT_STATE);
if (decrementCountBy(1)) {
if (finished) {
nextParse();
}
// 'xa commit' connectionClose
@@ -366,7 +375,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
mysqlCon.setXaStatus(TxState.TX_COMMIT_FAILED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_COMMIT_FAILED_STATE);
if (decrementCountBy(1)) {
if (finished) {
cleanAndFeedback(false);
}
}

View File

@@ -37,6 +37,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
private int tryRollbackTimes = 0;
private int backgroundRollbackTimes = 0;
private ParticipantLogEntry[] participantLogEntry = null;
private int participantLogSize = 0;
byte[] sendData = OkPacket.OK;
private volatile boolean sendFinishedFlag = false;
@@ -58,10 +59,10 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
public void rollback() {
final int initCount = session.getTargetCount();
participantLogSize = session.getTargetCount();
lock.lock();
try {
reset(initCount);
reset();
} finally {
lock.unlock();
}
@@ -77,6 +78,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
try {
sendFinishedFlag = false;
unResponseRrns.addAll(session.getTargetKeys());
for (final RouteResultsetNode node : session.getTargetKeys()) {
final BackendConnection conn = session.getTarget(node);
conn.setResponseHandler(this);
@@ -106,7 +108,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
}
if (session.getXaState() == TxState.TX_STARTED_STATE) {
if (participantLogEntry == null) {
participantLogEntry = new ParticipantLogEntry[nodeCount];
participantLogEntry = new ParticipantLogEntry[participantLogSize];
CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getSessionXaID(), participantLogEntry, session.getXaState());
XAStateLog.flushMemoryRepository(session.getSessionXaID(), coordinatorLogEntry);
}
@@ -147,7 +149,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.execCmd("XA END " + xaTxId + ";");
} else if (mysqlCon.getXaStatus() == TxState.TX_CONN_QUIT) {
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
}
@@ -165,7 +167,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
String xaTxId = mysqlCon.getConnXID(session, rrn.getMultiplexNum().longValue());
XaDelayProvider.delayBeforeXaRollback(rrn.getName(), xaTxId);
mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";");
} else if (decrementCountBy(1)) {
} else if (decrementToZero(mysqlCon)) {
cleanAndFeedback();
}
@@ -176,7 +178,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";");
} else if (mysqlCon.getXaStatus() == TxState.TX_CONN_QUIT || mysqlCon.getXaStatus() == TxState.TX_ROLLBACKED_STATE) {
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
cleanAndFeedback();
}
}
@@ -190,7 +192,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) {
mysqlCon.setXaStatus(TxState.TX_ENDED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
}
@@ -199,7 +201,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
}
@@ -211,7 +213,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
if (session.getXaState() == TxState.TX_PREPARED_STATE) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
}
@@ -232,7 +234,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
}
@@ -242,7 +244,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
}
@@ -252,7 +254,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.close();
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
cleanAndFeedback();
}
@@ -275,7 +277,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
if (session.getXaState() == TxState.TX_PREPARED_STATE) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
}
@@ -284,14 +286,14 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
} else {
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
cleanAndFeedback();
}
}
} else {
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
cleanAndFeedback();
}
}
@@ -305,12 +307,20 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
public void connectionError(Throwable e, BackendConnection conn) {
this.waitUntilSendFinish();
if (conn instanceof MySQLConnection) {
boolean finished;
lock.lock();
try {
errorConnsCnt++;
finished = canResponse();
} finally {
lock.unlock();
}
MySQLConnection mysqlCon = (MySQLConnection) conn;
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (finished) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
}
@@ -320,7 +330,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (finished) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
}
@@ -331,14 +341,14 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.close();
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
if (decrementCountBy(1)) {
if (finished) {
cleanAndFeedback();
}
// we don't know if the conn prepared or not
} else if (mysqlCon.getXaStatus() == TxState.TX_PREPARE_UNCONNECT_STATE) {
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (finished) {
cleanAndFeedback();
}
} else {
@@ -357,7 +367,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_ENDED_STATE);
rollback();
}
@@ -366,7 +376,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.close();
mysqlCon.setXaStatus(TxState.TX_ROLLBACKED_STATE);
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
session.setXaState(TxState.TX_INITIALIZE_STATE);
cleanAndFeedback();
}
@@ -377,7 +387,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
mysqlCon.close();
XAStateLog.saveXARecoveryLog(session.getSessionXaID(), mysqlCon);
session.setXaState(TxState.TX_ROLLBACK_FAILED_STATE);
if (decrementCountBy(1)) {
if (decrementToZero(mysqlCon)) {
cleanAndFeedback();
}
} else {

View File

@@ -523,6 +523,12 @@ public class NonBlockingSession implements Session {
multiNodeHandler.execute();
} catch (Exception e) {
LOGGER.info(String.valueOf(source) + rrs, e);
if (!source.isAutocommit() || source.isTxStart()) {
source.setTxInterrupt("ROLLBACK");
}
multiNodeHandler.waitAllConnConnectorError();
closeConnections();
setResponseTime(false);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
if (this.isPrepared()) {
@@ -827,6 +833,18 @@ public class NonBlockingSession implements Session {
}
private void closeConnections() {
Iterator<Entry<RouteResultsetNode, BackendConnection>> iter = target.entrySet().iterator();
while (iter.hasNext()) {
Entry<RouteResultsetNode, BackendConnection> entry = iter.next();
BackendConnection c = entry.getValue();
iter.remove();
if (c != null) {
c.setResponseHandler(null);
c.close("other node prepare conns failed");
}
}
}
public void waitFinishConnection(RouteResultsetNode rrn) {
BackendConnection c = target.get(rrn);
@@ -836,7 +854,7 @@ public class NonBlockingSession implements Session {
}
}
// thread may not safe
public void releaseConnections(final boolean needClosed) {
boolean debug = LOGGER.isDebugEnabled();
for (RouteResultsetNode rrn : target.keySet()) {