support lock table and unlock tables in transaction (#1297)

* support lock table and unlock tables in transaction

* revert copyResources.bat

* fix review
This commit is contained in:
Baofengqi
2019-08-08 14:18:50 +08:00
committed by tiger.yan
parent cfff7cae5a
commit b85e9c6e68
8 changed files with 50 additions and 18 deletions

0
copyResources.bat Normal file → Executable file
View File

View File

@@ -21,14 +21,14 @@ import java.util.Set;
public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implements CommitNodesHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractCommitNodesHandler.class);
protected Set<BackendConnection> closedConnSet;
protected ImplictCommitHandler implictCommitHandler = null;
public AbstractCommitNodesHandler(NonBlockingSession session) {
super(session);
}
protected abstract boolean executeCommit(MySQLConnection mysqlCon, int position);
@Override
public void rowEofResponse(byte[] eof, boolean isLeft, BackendConnection conn) {
LOGGER.info("unexpected packet for " + conn +
@@ -59,12 +59,11 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem
@Override
public void writeQueueAvailable() {
}
public void debugCommitDelay() {
}
protected boolean checkClosedConn(BackendConnection conn) {
lock.lock();
try {
@@ -82,4 +81,9 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem
lock.unlock();
}
}
@Override
public void setImplictCommitHandler(ImplictCommitHandler implictCommitHandler) {
this.implictCommitHandler = implictCommitHandler;
}
}

View File

@@ -7,5 +7,6 @@ package com.actiontech.dble.backend.mysql.nio.handler.transaction;
public interface CommitNodesHandler {
void commit();
void setImplictCommitHandler(ImplictCommitHandler handler);
void clearResources();
}

View File

@@ -0,0 +1,10 @@
/*
* Copyright (C) 2016-2019 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.backend.mysql.nio.handler.transaction;
public interface ImplictCommitHandler {
void next();
}

View File

@@ -24,8 +24,8 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
} finally {
lock.unlock();
}
int position = 0;
int position = 0;
for (RouteResultsetNode rrn : session.getTargetKeys()) {
final BackendConnection conn = session.getTarget(rrn);
conn.setResponseHandler(this);
@@ -33,11 +33,12 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
break;
}
}
}
@Override
public void clearResources() {
sendData = null;
implictCommitHandler = null;
if (closedConnSet != null) {
closedConnSet.clear();
}
@@ -56,7 +57,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
@Override
public void okResponse(byte[] ok, BackendConnection conn) {
if (decrementCountBy(1)) {
if (sendData == null) {
if (implictCommitHandler == null && sendData == null) {
sendData = session.getOkByteArray();
}
cleanAndFeedback();
@@ -107,15 +108,18 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler {
if (this.isFail()) {
createErrPkg(error).write(session.getSource());
setResponseTime(false);
} else if (implictCommitHandler != null) {
// continue to execute sql
implictCommitHandler.next();
} else {
setResponseTime(true);
session.getSource().write(send);
session.clearSavepoint();
boolean multiStatementFlag = session.getIsMultiStatement().get();
session.multiStatementNextSql(multiStatementFlag);
}
session.clearSavepoint();
session.multiStatementNextSql(session.getIsMultiStatement().get());
}
protected void setResponseTime(boolean isSuccess) {
}
}

View File

@@ -92,6 +92,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
backgroundCommitTimes = 0;
participantLogEntry = null;
sendData = OkPacket.OK;
implictCommitHandler = null;
xaOldThreadIds.clear();
if (closedConnSet != null) {
closedConnSet.clear();

View File

@@ -15,6 +15,7 @@ import com.actiontech.dble.backend.mysql.nio.handler.builder.HandlerBuilder;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.CommitNodesHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.ImplictCommitHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.RollbackNodesHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.normal.NormalCommitNodesHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.normal.NormalRollbackNodesHandler;
@@ -642,6 +643,11 @@ public class NonBlockingSession implements Session {
commitHandler.commit();
}
public void implictCommit(ImplictCommitHandler handler) {
commit();
commitHandler.setImplictCommitHandler(handler);
}
public void performSavePoint(String spName, SavePointHandler.Type type) {
if (savePointHandler == null) {
savePointHandler = new SavePointHandler(this);

View File

@@ -6,6 +6,7 @@
package com.actiontech.dble.server;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.ImplictCommitHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.savepoint.SavePointHandler;
import com.actiontech.dble.backend.mysql.xa.TxState;
import com.actiontech.dble.config.ErrorCode;
@@ -435,12 +436,20 @@ public class ServerConnection extends FrontendConnection {
}
void lockTable(String sql) {
// lock table is disable in transaction
if (!autocommit || isTxStart()) {
writeErrMessage(ErrorCode.ER_YES, "can't lock tables in transaction in dble!");
// except xa transaction
if ((!isAutocommit() || isTxStart()) && session.getSessionXaID() == null) {
session.implictCommit(new ImplictCommitHandler() {
@Override
public void next() {
doLockTable(sql);
}
});
return;
}
doLockTable(sql);
}
private void doLockTable(String sql) {
String db = this.schema;
SchemaConfig schema = null;
if (this.schema != null) {
@@ -450,6 +459,7 @@ public class ServerConnection extends FrontendConnection {
return;
}
}
RouteResultset rrs;
try {
rrs = DbleServer.getInstance().getRouterService().route(schema, ServerParse.LOCK, sql, this);
@@ -457,16 +467,13 @@ public class ServerConnection extends FrontendConnection {
executeException(e, sql);
return ;
}
if (rrs != null) {
session.lockTable(rrs);
}
}
void unLockTable(String sql) {
if (!autocommit || isTxStart()) {
writeErrMessage(ErrorCode.ER_YES, "can't unlock tables in transaction in dble!");
return;
}
sql = sql.replaceAll("\n", " ").replaceAll("\t", " ");
String[] words = SplitUtil.split(sql, ' ', true);
if (words.length == 2 && ("table".equalsIgnoreCase(words[1]) || "tables".equalsIgnoreCase(words[1]))) {
@@ -475,7 +482,6 @@ public class ServerConnection extends FrontendConnection {
} else {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
}
}
@Override