diff --git a/src/main/java/com/actiontech/dble/backend/mysql/ByteUtil.java b/src/main/java/com/actiontech/dble/backend/mysql/ByteUtil.java index b29b7c7fa..fc3267864 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/ByteUtil.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/ByteUtil.java @@ -5,6 +5,8 @@ */ package com.actiontech.dble.backend.mysql; +import java.nio.ByteBuffer; + /** * @author mycat */ @@ -106,4 +108,17 @@ public final class ByteUtil { public static void writeUB3(byte[] packet, int length) { writeUB3(packet, length, 0); } + + public static void writeUB3(ByteBuffer buffer, int val, int offset) { + buffer.put(offset, (byte) (val & 0xff)); + buffer.put(offset + 1, (byte) (val >>> 8)); + buffer.put(offset + 2, (byte) (val >>> 16)); + } + + public static void writeUB4(byte[] packet, long l, int offset) { + packet[offset] = (byte) (l & 0xff); + packet[offset + 1] = (byte) (l >>> 8); + packet[offset + 2] = (byte) (l >>> 16); + packet[offset + 3] = (byte) (l >>> 24); + } } diff --git a/src/main/java/com/actiontech/dble/net/handler/BackEndRecycleRunnable.java b/src/main/java/com/actiontech/dble/net/handler/BackEndRecycleRunnable.java index 5ddae94c7..08c2d25cd 100644 --- a/src/main/java/com/actiontech/dble/net/handler/BackEndRecycleRunnable.java +++ b/src/main/java/com/actiontech/dble/net/handler/BackEndRecycleRunnable.java @@ -66,6 +66,7 @@ public class BackEndRecycleRunnable implements Runnable, BackEndCleaner { lock.lock(); try { service.setRowDataFlowing(false); + service.setPrepare(false); condRelease.signal(); } finally { lock.unlock(); diff --git a/src/main/java/com/actiontech/dble/net/mysql/PreparedClosePacket.java b/src/main/java/com/actiontech/dble/net/mysql/PreparedClosePacket.java new file mode 100644 index 000000000..bd44cf072 --- /dev/null +++ b/src/main/java/com/actiontech/dble/net/mysql/PreparedClosePacket.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ +package com.actiontech.dble.net.mysql; + +import com.actiontech.dble.backend.mysql.BufferUtil; +import com.actiontech.dble.net.connection.AbstractConnection; +import com.actiontech.dble.net.service.AbstractService; + +import java.nio.ByteBuffer; + +/** + *
+ * COM_STMT_CLOSE deallocates a prepared statement
+ *
+ * No response is sent back to the client.
+ *
+ * COM_STMT_CLOSE:
+ *  Bytes              Name
+ *  -----              ----
+ *  1                  [19] COM_STMT_CLOSE
+ *  4                  statement-id
+ *
+ *  @see https://dev.mysql.com/doc/internals/en/com-stmt-close.html
+ * 
+ * + * @author collapsar + */ +public class PreparedClosePacket extends MySQLPacket { + + private long statementId; + + public PreparedClosePacket(long statementId) { + this.statementId = statementId; + } + + @Override + public ByteBuffer write(ByteBuffer buffer, AbstractService service, boolean writeSocketIfFull) { + int size = calcPacketSize(); + buffer = service.checkWriteBuffer(buffer, PACKET_HEADER_SIZE + size, writeSocketIfFull); + BufferUtil.writeUB3(buffer, size); + buffer.put(packetId); + buffer.put((byte) 0x19); + BufferUtil.writeUB4(buffer, statementId); + return buffer; + } + + @Override + public void bufferWrite(AbstractConnection connection) { + int size = calcPacketSize(); + ByteBuffer buffer = connection.allocate(PACKET_HEADER_SIZE + size); + BufferUtil.writeUB3(buffer, size); + buffer.put(packetId); + buffer.put((byte) 0x19); + BufferUtil.writeUB4(buffer, statementId); + connection.write(buffer); + } + + @Override + public int calcPacketSize() { + return 5; + } + + @Override + protected String getPacketInfo() { + return "MySQL Prepared Close Packet"; + } + + @Override + public boolean isEndOfQuery() { + return true; + } + + public long getStatementId() { + return statementId; + } + + public void setStatementId(long statementId) { + this.statementId = statementId; + } + +} diff --git a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java index beb87724a..6b12e7b8e 100644 --- a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java @@ -2,11 +2,11 @@ package com.actiontech.dble.rwsplit; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.backend.mysql.ByteUtil; import com.actiontech.dble.config.ErrorCode; import com.actiontech.dble.net.connection.BackendConnection; -import com.actiontech.dble.services.rwsplit.Callback; -import com.actiontech.dble.services.rwsplit.RWSplitHandler; -import com.actiontech.dble.services.rwsplit.RWSplitService; +import com.actiontech.dble.net.mysql.MySQLPacket; +import com.actiontech.dble.services.rwsplit.*; import com.actiontech.dble.singleton.RouteService; import com.actiontech.dble.util.StringUtil; import org.jetbrains.annotations.Nullable; @@ -72,12 +72,23 @@ public class RWSplitNonBlockingSession { } @Nullable - private RWSplitHandler getRwSplitHandler(byte[] originPacket, Callback callback) throws SQLSyntaxErrorException { + private RWSplitHandler getRwSplitHandler(byte[] originPacket, Callback callback) throws SQLSyntaxErrorException, IOException { RWSplitHandler handler = new RWSplitHandler(rwSplitService, originPacket, callback, false); if (conn != null && !conn.isClosed()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("select bind conn[id={}]", conn.getId()); } + // for ps needs to send master + if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) { + long statementId = ByteUtil.readUB4(originPacket, 5); + PreparedStatementHolder holder = rwSplitService.getPrepareStatement(statementId); + if (holder.isMustMaster() && conn.getInstance().isReadInstance()) { + holder.setExecuteOrigin(originPacket); + PSHandler psHandler = new PSHandler(rwSplitService, holder); + psHandler.execute(rwGroup); + return null; + } + } checkDest(!conn.getInstance().isReadInstance()); handler.execute(conn); return null; @@ -170,7 +181,7 @@ public class RWSplitNonBlockingSession { public void unbindIfSafe() { if (rwSplitService.isAutocommit() && !rwSplitService.isTxStart() && !rwSplitService.isLocked() && !rwSplitService.isInLoadData() && - !rwSplitService.isInPrepare() && this.conn != null) { + !rwSplitService.isInPrepare() && this.conn != null && rwSplitService.getPsHolder().isEmpty()) { this.conn.release(); this.conn = null; } diff --git a/src/main/java/com/actiontech/dble/services/BusinessService.java b/src/main/java/com/actiontech/dble/services/BusinessService.java index 07b6ec560..06c2074a8 100644 --- a/src/main/java/com/actiontech/dble/services/BusinessService.java +++ b/src/main/java/com/actiontech/dble/services/BusinessService.java @@ -15,6 +15,8 @@ public abstract class BusinessService extends FrontEndService { protected volatile boolean txStarted; protected final AtomicLong queriesCounter = new AtomicLong(0); protected final AtomicLong transactionsCounter = new AtomicLong(0); + private volatile boolean isLockTable; + public BusinessService(AbstractConnection connection) { super(connection); @@ -61,6 +63,15 @@ public abstract class BusinessService extends FrontEndService { transactionsCounter.set(Long.MIN_VALUE); } + public boolean isLockTable() { + return isLockTable; + } + + public void setLockTable(boolean locked) { + isLockTable = locked; + } + + public void executeContextSetTask(MysqlVariable[] contextTask) { MysqlVariable autocommitItem = null; diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java index 315cdddec..ae1c596da 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -86,6 +87,9 @@ public class MySQLResponseService extends VariablesService { protected BackendConnection connection; + private volatile boolean prepare = false; + + static { COMMIT.setPacketId(0); COMMIT.setCommand(MySQLPacket.COM_QUERY); @@ -147,7 +151,9 @@ public class MySQLResponseService extends VariablesService { } return; } - if (prepareOK) { + if (prepareOK && prepare) { + baseLogicHandler.handleInnerData(data); + } else if (prepareOK) { prepareLogicHandler.handleInnerData(data); } else if (statisticResponse) { statisticsLogicHandler.handleInnerData(data); @@ -537,6 +543,7 @@ public class MySQLResponseService extends VariablesService { statusSync = null; isDDL = false; testing = false; + setPrepare(false); setResponseHandler(null); setSession(null); logResponse.set(false); @@ -647,6 +654,18 @@ public class MySQLResponseService extends VariablesService { } } + // only for com_stmt_execute + public void execute(ByteBuffer buffer) { + setPrepare(true); + writeDirectly(buffer); + } + + // only for com_stmt_execute + public void execute(byte[] originPacket) { + setPrepare(true); + writeDirectly(originPacket); + } + // the purpose is to set old schema to null private void changeUser() { DbInstanceConfig config = connection.getInstance().getConfig(); @@ -850,6 +869,10 @@ public class MySQLResponseService extends VariablesService { isExecuting = executing; } + public void setPrepare(boolean prepare) { + this.prepare = prepare; + } + public String toString() { return "MySQLResponseService[isExecuting = " + isExecuting + " attachment = " + attachment + " autocommitSynced = " + autocommitSynced + " isolationSynced = " + isolationSynced + " xaStatus = " + xaStatus + " isDDL = " + isDDL + " complexQuery = " + complexQuery + "] with response handler [" + responseHandler + "] with rrs = [" + diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/PSHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/PSHandler.java new file mode 100644 index 000000000..829be817d --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/rwsplit/PSHandler.java @@ -0,0 +1,137 @@ +package com.actiontech.dble.services.rwsplit; + +import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.backend.mysql.ByteUtil; +import com.actiontech.dble.backend.mysql.nio.handler.PreparedResponseHandler; +import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; +import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.net.connection.BackendConnection; +import com.actiontech.dble.net.mysql.FieldPacket; +import com.actiontech.dble.net.mysql.PreparedClosePacket; +import com.actiontech.dble.net.mysql.RowDataPacket; +import com.actiontech.dble.net.service.AbstractService; +import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; +import com.actiontech.dble.net.connection.AbstractConnection; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public class PSHandler implements ResponseHandler, PreparedResponseHandler { + + private final RWSplitService rwSplitService; + private final AbstractConnection frontedConnection; + private final PreparedStatementHolder holder; + private boolean write2Client = false; + private long originStatementId; + + public PSHandler(RWSplitService service, PreparedStatementHolder holder) { + this.rwSplitService = service; + this.frontedConnection = service.getConnection(); + this.holder = holder; + } + + public void execute(PhysicalDbGroup rwGroup) throws IOException { + PhysicalDbInstance instance = rwGroup.select(true, true); + instance.getConnection(rwSplitService.getSchema(), this, null, true); + } + + @Override + public void connectionAcquired(BackendConnection conn) { + MySQLResponseService mysqlService = conn.getBackendService(); + mysqlService.setResponseHandler(this); + mysqlService.execute(rwSplitService, holder.getPrepareOrigin()); + } + + @Override + public void preparedOkResponse(byte[] ok, List fields, List params, MySQLResponseService service) { + originStatementId = ByteUtil.readUB4(ok, 5); + ByteUtil.writeUB4(holder.getExecuteOrigin(), originStatementId, 5); + int length = ByteUtil.readUB3(holder.getExecuteOrigin(), 0); + int paramsCount = holder.getParamsCount(); + int nullBitMapSize = (paramsCount + 7) / 8; + byte[] originExecuteByte = holder.getExecuteOrigin(); + if (holder.isNeedAddFieldType()) { + length += paramsCount * 2; + ByteBuffer buffer = service.allocate(originExecuteByte.length + paramsCount * 2); + buffer.put(originExecuteByte, 0, 14 + nullBitMapSize); + ByteUtil.writeUB3(buffer, length, 0); + //flag type + buffer.put(14 + nullBitMapSize, (byte) 1); + //field type + buffer.position(15 + nullBitMapSize); + byte[] fileType = holder.getFieldType(); + buffer.put(fileType, 0, fileType.length); + buffer.position(15 + nullBitMapSize + paramsCount * 2); + buffer.put(originExecuteByte, 15 + nullBitMapSize, originExecuteByte.length - (15 + nullBitMapSize)); + service.setResponseHandler(this); + service.execute(buffer); + } else { + service.setResponseHandler(this); + service.execute(holder.getExecuteOrigin()); + } + } + + @Override + public void connectionError(Throwable e, Object attachment) { + rwSplitService.writeErrMessage(ErrorCode.ER_DB_INSTANCE_ABORTING_CONNECTION, "can't connect to dbGroup[" + rwSplitService.getUserConfig().getDbGroup()); + } + + @Override + public void errorResponse(byte[] err, AbstractService service) { + synchronized (this) { + if (!write2Client) { + err[3] = (byte) rwSplitService.nextPacketId(); + frontedConnection.getService().writeDirectly(err); + write2Client = true; + PreparedClosePacket close = new PreparedClosePacket(originStatementId); + close.bufferWrite(service.getConnection()); + ((MySQLResponseService) service).release(); + } + } + } + + @Override + public void okResponse(byte[] ok, AbstractService service) { + MySQLResponseService mysqlService = (MySQLResponseService) service; + boolean executeResponse = mysqlService.syncAndExecute(); + if (executeResponse) { + synchronized (this) { + if (!write2Client) { + frontedConnection.getService().writeDirectly(ok); + write2Client = true; + PreparedClosePacket close = new PreparedClosePacket(originStatementId); + close.bufferWrite(service.getConnection()); + ((MySQLResponseService) service).release(); + } + } + } + } + + @Override + public void connectionClose(AbstractService service, String reason) { + rwSplitService.getConnection().close("backend connection is close in ps"); + } + + @Override + public void preparedExecuteResponse(byte[] header, List fields, byte[] eof, MySQLResponseService service) { + } + + @Override + public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof, boolean isLeft, AbstractService service) { + + } + + @Override + public boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, AbstractService service) { + return false; + } + + @Override + public void rowEofResponse(byte[] eof, boolean isLeft, AbstractService service) { + + } + +} diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/PreparedStatementHolder.java b/src/main/java/com/actiontech/dble/services/rwsplit/PreparedStatementHolder.java new file mode 100644 index 000000000..d3adc89f2 --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/rwsplit/PreparedStatementHolder.java @@ -0,0 +1,61 @@ +package com.actiontech.dble.services.rwsplit; + +public class PreparedStatementHolder { + + private final byte[] prepareOrigin; + private final int paramsCount; + private byte[] executeOrigin; + private final boolean mustMaster; + private byte[] fieldType; + private boolean needAddFieldType; + private String prepareSql; + + public PreparedStatementHolder(byte[] prepareOrigin, int paramsCount, boolean mustMaster, String sql) { + this.prepareOrigin = prepareOrigin; + this.paramsCount = paramsCount; + this.mustMaster = mustMaster; + this.prepareSql = sql; + } + + public boolean isMustMaster() { + return mustMaster; + } + + public byte[] getPrepareOrigin() { + return prepareOrigin; + } + + public byte[] getExecuteOrigin() { + return executeOrigin; + } + + public void setExecuteOrigin(byte[] executeOrigin) { + int nullBitMapSize = (paramsCount + 7) / 8; + byte newParameterBoundFlag = executeOrigin[14 + nullBitMapSize]; + if (newParameterBoundFlag == (byte) 1) { + fieldType = new byte[2 * paramsCount]; + System.arraycopy(executeOrigin, 15 + nullBitMapSize, fieldType, 0, 2 * paramsCount); + + } else if (fieldType != null) { + needAddFieldType = true; + } + this.executeOrigin = executeOrigin; + } + + public byte[] getFieldType() { + return fieldType; + } + + // doesn't contain field type except first execute packet + public boolean isNeedAddFieldType() { + return needAddFieldType; + } + + public int getParamsCount() { + return paramsCount; + } + + public String getPrepareSql() { + return prepareSql; + } +} diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java index 9b6d3dfc4..e31cd4155 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java @@ -37,7 +37,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler, /** * If there are more packets next.This flag in would be set. */ - private static final int HAS_MORE_RESULTS = 0x08; + private static final int HAS_MORE_RESULTS = 0x0008; public RWSplitHandler(RWSplitService service, byte[] originPacket, Callback callback, boolean isHint) { this.rwSplitService = service; diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java index 4ffe8d7e1..701c39d70 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java @@ -1,6 +1,7 @@ package com.actiontech.dble.services.rwsplit; import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.mysql.ByteUtil; import com.actiontech.dble.backend.mysql.MySQLMessage; import com.actiontech.dble.config.ErrorCode; import com.actiontech.dble.config.model.user.RwSplitUserConfig; @@ -20,6 +21,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -40,6 +42,11 @@ public class RWSplitService extends BusinessService { private final RWSplitQueryHandler queryHandler; private final RWSplitNonBlockingSession session; + public static final int LOCK_READ = 2; + + // prepare statement + private ConcurrentHashMap psHolder = new ConcurrentHashMap<>(); + public RWSplitService(AbstractConnection connection) { super(connection); this.session = new RWSplitNonBlockingSession(this); @@ -123,11 +130,14 @@ public class RWSplitService extends BusinessService { break; case MySQLPacket.COM_STMT_CLOSE: commands.doStmtClose(); - session.execute(true, data, (isSuccess, resp, rwSplitService) -> { - rwSplitService.setInPrepare(false); - }); + session.execute(true, data, null); + // COM_STMT_CLOSE No response is sent back to the client. + inPrepare = false; + long statementId = ByteUtil.readUB4(data, 5); + psHolder.remove(statementId); + session.unbindIfSafe(); break; - // connection + // connection case MySQLPacket.COM_QUIT: commands.doQuit(); session.close("quit cmd"); @@ -198,19 +208,42 @@ public class RWSplitService extends BusinessService { try { inPrepare = true; String sql = mm.readString(getCharset().getClient()); + if (sql.endsWith(";")) { + sql = sql.substring(0, sql.length() - 1).trim(); + } + sql = sql.trim(); + final String finalSql = sql; int rs = ServerParse.parse(sql); int sqlType = rs & 0xff; switch (sqlType) { case ServerParse.SELECT: int rs2 = ServerParse.parseSpecial(sqlType, sql); - if (rs2 == ServerParse.SELECT_FOR_UPDATE || rs2 == ServerParse.LOCK_IN_SHARE_MODE) { - session.execute(true, data, null); + if (rs2 == LOCK_READ) { + session.execute(true, data, (isSuccess, resp, rwSplitService) -> { + if (isSuccess) { + long statementId = ByteUtil.readUB4(resp, 5); + int paramCount = ByteUtil.readUB2(resp, 11); + psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql)); + } + }, false); } else { - session.execute(null, data, null); + session.execute(null, data, (isSuccess, resp, rwSplitService) -> { + if (isSuccess) { + long statementId = ByteUtil.readUB4(resp, 5); + int paramCount = ByteUtil.readUB2(resp, 11); + psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, false, finalSql)); + } + }, false); } break; default: - session.execute(true, data, null); + session.execute(true, data, (isSuccess, resp, rwSplitService) -> { + if (isSuccess) { + long statementId = ByteUtil.readUB4(resp, 5); + int paramCount = ByteUtil.readUB2(resp, 11); + psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql)); + } + }); break; } } catch (IOException e) { @@ -259,6 +292,7 @@ public class RWSplitService extends BusinessService { public byte[] getExecuteSqlBytes() { return executeSqlBytes; } + public boolean isInPrepare() { return inPrepare; } @@ -289,4 +323,14 @@ public class RWSplitService extends BusinessService { session.close("clean up"); } } + + + public PreparedStatementHolder getPrepareStatement(long id) { + return psHolder.get(id); + } + + public ConcurrentHashMap getPsHolder() { + return psHolder; + } + }