From b85e9c6e6857bf87ebc3db6bb404cd551f2840f6 Mon Sep 17 00:00:00 2001 From: Baofengqi <838800176@qq.com> Date: Thu, 8 Aug 2019 14:18:50 +0800 Subject: [PATCH] support lock table and unlock tables in transaction (#1297) * support lock table and unlock tables in transaction * revert copyResources.bat * fix review --- copyResources.bat | 0 .../AbstractCommitNodesHandler.java | 12 ++++++---- .../transaction/CommitNodesHandler.java | 1 + .../transaction/ImplictCommitHandler.java | 10 +++++++++ .../normal/NormalCommitNodesHandler.java | 16 +++++++++----- .../transaction/xa/XACommitNodesHandler.java | 1 + .../dble/server/NonBlockingSession.java | 6 +++++ .../dble/server/ServerConnection.java | 22 ++++++++++++------- 8 files changed, 50 insertions(+), 18 deletions(-) mode change 100644 => 100755 copyResources.bat create mode 100644 src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/ImplictCommitHandler.java diff --git a/copyResources.bat b/copyResources.bat old mode 100644 new mode 100755 diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractCommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractCommitNodesHandler.java index 88e65a5b2..051aa6a33 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractCommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/AbstractCommitNodesHandler.java @@ -21,14 +21,14 @@ import java.util.Set; public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implements CommitNodesHandler { protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractCommitNodesHandler.class); protected Set closedConnSet; + protected ImplictCommitHandler implictCommitHandler = null; + public AbstractCommitNodesHandler(NonBlockingSession session) { super(session); } protected abstract boolean executeCommit(MySQLConnection mysqlCon, int position); - - @Override public void rowEofResponse(byte[] eof, boolean isLeft, BackendConnection conn) { LOGGER.info("unexpected packet for " + conn + @@ -59,12 +59,11 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem @Override public void writeQueueAvailable() { - } public void debugCommitDelay() { - } + protected boolean checkClosedConn(BackendConnection conn) { lock.lock(); try { @@ -82,4 +81,9 @@ public abstract class AbstractCommitNodesHandler extends MultiNodeHandler implem lock.unlock(); } } + + @Override + public void setImplictCommitHandler(ImplictCommitHandler implictCommitHandler) { + this.implictCommitHandler = implictCommitHandler; + } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/CommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/CommitNodesHandler.java index 0da28e477..cbb41b359 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/CommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/CommitNodesHandler.java @@ -7,5 +7,6 @@ package com.actiontech.dble.backend.mysql.nio.handler.transaction; public interface CommitNodesHandler { void commit(); + void setImplictCommitHandler(ImplictCommitHandler handler); void clearResources(); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/ImplictCommitHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/ImplictCommitHandler.java new file mode 100644 index 000000000..b2159d4b6 --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/ImplictCommitHandler.java @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2016-2019 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.backend.mysql.nio.handler.transaction; + +public interface ImplictCommitHandler { + void next(); +} diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalCommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalCommitNodesHandler.java index fc818fbd4..1cf6306d5 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalCommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/normal/NormalCommitNodesHandler.java @@ -24,8 +24,8 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { } finally { lock.unlock(); } - int position = 0; + int position = 0; for (RouteResultsetNode rrn : session.getTargetKeys()) { final BackendConnection conn = session.getTarget(rrn); conn.setResponseHandler(this); @@ -33,11 +33,12 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { break; } } - } + @Override public void clearResources() { sendData = null; + implictCommitHandler = null; if (closedConnSet != null) { closedConnSet.clear(); } @@ -56,7 +57,7 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { @Override public void okResponse(byte[] ok, BackendConnection conn) { if (decrementCountBy(1)) { - if (sendData == null) { + if (implictCommitHandler == null && sendData == null) { sendData = session.getOkByteArray(); } cleanAndFeedback(); @@ -107,15 +108,18 @@ public class NormalCommitNodesHandler extends AbstractCommitNodesHandler { if (this.isFail()) { createErrPkg(error).write(session.getSource()); setResponseTime(false); + } else if (implictCommitHandler != null) { + // continue to execute sql + implictCommitHandler.next(); } else { setResponseTime(true); session.getSource().write(send); - session.clearSavepoint(); - boolean multiStatementFlag = session.getIsMultiStatement().get(); - session.multiStatementNextSql(multiStatementFlag); } + session.clearSavepoint(); + session.multiStatementNextSql(session.getIsMultiStatement().get()); } protected void setResponseTime(boolean isSuccess) { } + } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java index d2d325c36..22e1ecfe5 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java @@ -92,6 +92,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler { backgroundCommitTimes = 0; participantLogEntry = null; sendData = OkPacket.OK; + implictCommitHandler = null; xaOldThreadIds.clear(); if (closedConnSet != null) { closedConnSet.clear(); diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java index 9986a7ce1..1586c1d98 100644 --- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java @@ -15,6 +15,7 @@ import com.actiontech.dble.backend.mysql.nio.handler.builder.HandlerBuilder; import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler; import com.actiontech.dble.backend.mysql.nio.handler.transaction.CommitNodesHandler; +import com.actiontech.dble.backend.mysql.nio.handler.transaction.ImplictCommitHandler; import com.actiontech.dble.backend.mysql.nio.handler.transaction.RollbackNodesHandler; import com.actiontech.dble.backend.mysql.nio.handler.transaction.normal.NormalCommitNodesHandler; import com.actiontech.dble.backend.mysql.nio.handler.transaction.normal.NormalRollbackNodesHandler; @@ -642,6 +643,11 @@ public class NonBlockingSession implements Session { commitHandler.commit(); } + public void implictCommit(ImplictCommitHandler handler) { + commit(); + commitHandler.setImplictCommitHandler(handler); + } + public void performSavePoint(String spName, SavePointHandler.Type type) { if (savePointHandler == null) { savePointHandler = new SavePointHandler(this); diff --git a/src/main/java/com/actiontech/dble/server/ServerConnection.java b/src/main/java/com/actiontech/dble/server/ServerConnection.java index c3f323d2f..c05cfe9d6 100644 --- a/src/main/java/com/actiontech/dble/server/ServerConnection.java +++ b/src/main/java/com/actiontech/dble/server/ServerConnection.java @@ -6,6 +6,7 @@ package com.actiontech.dble.server; import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.mysql.nio.handler.transaction.ImplictCommitHandler; import com.actiontech.dble.backend.mysql.nio.handler.transaction.savepoint.SavePointHandler; import com.actiontech.dble.backend.mysql.xa.TxState; import com.actiontech.dble.config.ErrorCode; @@ -435,12 +436,20 @@ public class ServerConnection extends FrontendConnection { } void lockTable(String sql) { - // lock table is disable in transaction - if (!autocommit || isTxStart()) { - writeErrMessage(ErrorCode.ER_YES, "can't lock tables in transaction in dble!"); + // except xa transaction + if ((!isAutocommit() || isTxStart()) && session.getSessionXaID() == null) { + session.implictCommit(new ImplictCommitHandler() { + @Override + public void next() { + doLockTable(sql); + } + }); return; } + doLockTable(sql); + } + private void doLockTable(String sql) { String db = this.schema; SchemaConfig schema = null; if (this.schema != null) { @@ -450,6 +459,7 @@ public class ServerConnection extends FrontendConnection { return; } } + RouteResultset rrs; try { rrs = DbleServer.getInstance().getRouterService().route(schema, ServerParse.LOCK, sql, this); @@ -457,16 +467,13 @@ public class ServerConnection extends FrontendConnection { executeException(e, sql); return ; } + if (rrs != null) { session.lockTable(rrs); } } void unLockTable(String sql) { - if (!autocommit || isTxStart()) { - writeErrMessage(ErrorCode.ER_YES, "can't unlock tables in transaction in dble!"); - return; - } sql = sql.replaceAll("\n", " ").replaceAll("\t", " "); String[] words = SplitUtil.split(sql, ' ', true); if (words.length == 2 && ("table".equalsIgnoreCase(words[1]) || "tables".equalsIgnoreCase(words[1]))) { @@ -475,7 +482,6 @@ public class ServerConnection extends FrontendConnection { } else { writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command"); } - } @Override