strength prepare statement protocol inner 1475

This commit is contained in:
ylz
2024-03-26 03:59:53 +08:00
parent b3647b0f87
commit 30445305c4
10 changed files with 402 additions and 15 deletions

View File

@@ -5,6 +5,8 @@
*/
package com.actiontech.dble.backend.mysql;
import java.nio.ByteBuffer;
/**
* @author mycat
*/
@@ -106,4 +108,17 @@ public final class ByteUtil {
public static void writeUB3(byte[] packet, int length) {
writeUB3(packet, length, 0);
}
public static void writeUB3(ByteBuffer buffer, int val, int offset) {
buffer.put(offset, (byte) (val & 0xff));
buffer.put(offset + 1, (byte) (val >>> 8));
buffer.put(offset + 2, (byte) (val >>> 16));
}
public static void writeUB4(byte[] packet, long l, int offset) {
packet[offset] = (byte) (l & 0xff);
packet[offset + 1] = (byte) (l >>> 8);
packet[offset + 2] = (byte) (l >>> 16);
packet[offset + 3] = (byte) (l >>> 24);
}
}

View File

@@ -66,6 +66,7 @@ public class BackEndRecycleRunnable implements Runnable, BackEndCleaner {
lock.lock();
try {
service.setRowDataFlowing(false);
service.setPrepare(false);
condRelease.signal();
} finally {
lock.unlock();

View File

@@ -0,0 +1,84 @@
/*
* Copyright (C) 2016-2022 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.net.mysql;
import com.actiontech.dble.backend.mysql.BufferUtil;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.service.AbstractService;
import java.nio.ByteBuffer;
/**
* <pre>
* COM_STMT_CLOSE deallocates a prepared statement
*
* No response is sent back to the client.
*
* COM_STMT_CLOSE:
* Bytes Name
* ----- ----
* 1 [19] COM_STMT_CLOSE
* 4 statement-id
*
* @see https://dev.mysql.com/doc/internals/en/com-stmt-close.html
* </pre>
*
* @author collapsar
*/
public class PreparedClosePacket extends MySQLPacket {
private long statementId;
public PreparedClosePacket(long statementId) {
this.statementId = statementId;
}
@Override
public ByteBuffer write(ByteBuffer buffer, AbstractService service, boolean writeSocketIfFull) {
int size = calcPacketSize();
buffer = service.checkWriteBuffer(buffer, PACKET_HEADER_SIZE + size, writeSocketIfFull);
BufferUtil.writeUB3(buffer, size);
buffer.put(packetId);
buffer.put((byte) 0x19);
BufferUtil.writeUB4(buffer, statementId);
return buffer;
}
@Override
public void bufferWrite(AbstractConnection connection) {
int size = calcPacketSize();
ByteBuffer buffer = connection.allocate(PACKET_HEADER_SIZE + size);
BufferUtil.writeUB3(buffer, size);
buffer.put(packetId);
buffer.put((byte) 0x19);
BufferUtil.writeUB4(buffer, statementId);
connection.write(buffer);
}
@Override
public int calcPacketSize() {
return 5;
}
@Override
protected String getPacketInfo() {
return "MySQL Prepared Close Packet";
}
@Override
public boolean isEndOfQuery() {
return true;
}
public long getStatementId() {
return statementId;
}
public void setStatementId(long statementId) {
this.statementId = statementId;
}
}

View File

@@ -2,11 +2,11 @@ package com.actiontech.dble.rwsplit;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.ByteUtil;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.services.rwsplit.Callback;
import com.actiontech.dble.services.rwsplit.RWSplitHandler;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.services.rwsplit.*;
import com.actiontech.dble.singleton.RouteService;
import com.actiontech.dble.util.StringUtil;
import org.jetbrains.annotations.Nullable;
@@ -72,12 +72,23 @@ public class RWSplitNonBlockingSession {
}
@Nullable
private RWSplitHandler getRwSplitHandler(byte[] originPacket, Callback callback) throws SQLSyntaxErrorException {
private RWSplitHandler getRwSplitHandler(byte[] originPacket, Callback callback) throws SQLSyntaxErrorException, IOException {
RWSplitHandler handler = new RWSplitHandler(rwSplitService, originPacket, callback, false);
if (conn != null && !conn.isClosed()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select bind conn[id={}]", conn.getId());
}
// for ps needs to send master
if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) {
long statementId = ByteUtil.readUB4(originPacket, 5);
PreparedStatementHolder holder = rwSplitService.getPrepareStatement(statementId);
if (holder.isMustMaster() && conn.getInstance().isReadInstance()) {
holder.setExecuteOrigin(originPacket);
PSHandler psHandler = new PSHandler(rwSplitService, holder);
psHandler.execute(rwGroup);
return null;
}
}
checkDest(!conn.getInstance().isReadInstance());
handler.execute(conn);
return null;
@@ -170,7 +181,7 @@ public class RWSplitNonBlockingSession {
public void unbindIfSafe() {
if (rwSplitService.isAutocommit() && !rwSplitService.isTxStart() && !rwSplitService.isLocked() &&
!rwSplitService.isInLoadData() &&
!rwSplitService.isInPrepare() && this.conn != null) {
!rwSplitService.isInPrepare() && this.conn != null && rwSplitService.getPsHolder().isEmpty()) {
this.conn.release();
this.conn = null;
}

View File

@@ -15,6 +15,8 @@ public abstract class BusinessService extends FrontEndService {
protected volatile boolean txStarted;
protected final AtomicLong queriesCounter = new AtomicLong(0);
protected final AtomicLong transactionsCounter = new AtomicLong(0);
private volatile boolean isLockTable;
public BusinessService(AbstractConnection connection) {
super(connection);
@@ -61,6 +63,15 @@ public abstract class BusinessService extends FrontEndService {
transactionsCounter.set(Long.MIN_VALUE);
}
public boolean isLockTable() {
return isLockTable;
}
public void setLockTable(boolean locked) {
isLockTable = locked;
}
public void executeContextSetTask(MysqlVariable[] contextTask) {
MysqlVariable autocommitItem = null;

View File

@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@@ -86,6 +87,9 @@ public class MySQLResponseService extends VariablesService {
protected BackendConnection connection;
private volatile boolean prepare = false;
static {
COMMIT.setPacketId(0);
COMMIT.setCommand(MySQLPacket.COM_QUERY);
@@ -147,7 +151,9 @@ public class MySQLResponseService extends VariablesService {
}
return;
}
if (prepareOK) {
if (prepareOK && prepare) {
baseLogicHandler.handleInnerData(data);
} else if (prepareOK) {
prepareLogicHandler.handleInnerData(data);
} else if (statisticResponse) {
statisticsLogicHandler.handleInnerData(data);
@@ -537,6 +543,7 @@ public class MySQLResponseService extends VariablesService {
statusSync = null;
isDDL = false;
testing = false;
setPrepare(false);
setResponseHandler(null);
setSession(null);
logResponse.set(false);
@@ -647,6 +654,18 @@ public class MySQLResponseService extends VariablesService {
}
}
// only for com_stmt_execute
public void execute(ByteBuffer buffer) {
setPrepare(true);
writeDirectly(buffer);
}
// only for com_stmt_execute
public void execute(byte[] originPacket) {
setPrepare(true);
writeDirectly(originPacket);
}
// the purpose is to set old schema to null
private void changeUser() {
DbInstanceConfig config = connection.getInstance().getConfig();
@@ -850,6 +869,10 @@ public class MySQLResponseService extends VariablesService {
isExecuting = executing;
}
public void setPrepare(boolean prepare) {
this.prepare = prepare;
}
public String toString() {
return "MySQLResponseService[isExecuting = " + isExecuting + " attachment = " + attachment + " autocommitSynced = " + autocommitSynced + " isolationSynced = " + isolationSynced +
" xaStatus = " + xaStatus + " isDDL = " + isDDL + " complexQuery = " + complexQuery + "] with response handler [" + responseHandler + "] with rrs = [" +

View File

@@ -0,0 +1,137 @@
package com.actiontech.dble.services.rwsplit;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.ByteUtil;
import com.actiontech.dble.backend.mysql.nio.handler.PreparedResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.PreparedClosePacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.net.connection.AbstractConnection;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
public class PSHandler implements ResponseHandler, PreparedResponseHandler {
private final RWSplitService rwSplitService;
private final AbstractConnection frontedConnection;
private final PreparedStatementHolder holder;
private boolean write2Client = false;
private long originStatementId;
public PSHandler(RWSplitService service, PreparedStatementHolder holder) {
this.rwSplitService = service;
this.frontedConnection = service.getConnection();
this.holder = holder;
}
public void execute(PhysicalDbGroup rwGroup) throws IOException {
PhysicalDbInstance instance = rwGroup.select(true, true);
instance.getConnection(rwSplitService.getSchema(), this, null, true);
}
@Override
public void connectionAcquired(BackendConnection conn) {
MySQLResponseService mysqlService = conn.getBackendService();
mysqlService.setResponseHandler(this);
mysqlService.execute(rwSplitService, holder.getPrepareOrigin());
}
@Override
public void preparedOkResponse(byte[] ok, List<byte[]> fields, List<byte[]> params, MySQLResponseService service) {
originStatementId = ByteUtil.readUB4(ok, 5);
ByteUtil.writeUB4(holder.getExecuteOrigin(), originStatementId, 5);
int length = ByteUtil.readUB3(holder.getExecuteOrigin(), 0);
int paramsCount = holder.getParamsCount();
int nullBitMapSize = (paramsCount + 7) / 8;
byte[] originExecuteByte = holder.getExecuteOrigin();
if (holder.isNeedAddFieldType()) {
length += paramsCount * 2;
ByteBuffer buffer = service.allocate(originExecuteByte.length + paramsCount * 2);
buffer.put(originExecuteByte, 0, 14 + nullBitMapSize);
ByteUtil.writeUB3(buffer, length, 0);
//flag type
buffer.put(14 + nullBitMapSize, (byte) 1);
//field type
buffer.position(15 + nullBitMapSize);
byte[] fileType = holder.getFieldType();
buffer.put(fileType, 0, fileType.length);
buffer.position(15 + nullBitMapSize + paramsCount * 2);
buffer.put(originExecuteByte, 15 + nullBitMapSize, originExecuteByte.length - (15 + nullBitMapSize));
service.setResponseHandler(this);
service.execute(buffer);
} else {
service.setResponseHandler(this);
service.execute(holder.getExecuteOrigin());
}
}
@Override
public void connectionError(Throwable e, Object attachment) {
rwSplitService.writeErrMessage(ErrorCode.ER_DB_INSTANCE_ABORTING_CONNECTION, "can't connect to dbGroup[" + rwSplitService.getUserConfig().getDbGroup());
}
@Override
public void errorResponse(byte[] err, AbstractService service) {
synchronized (this) {
if (!write2Client) {
err[3] = (byte) rwSplitService.nextPacketId();
frontedConnection.getService().writeDirectly(err);
write2Client = true;
PreparedClosePacket close = new PreparedClosePacket(originStatementId);
close.bufferWrite(service.getConnection());
((MySQLResponseService) service).release();
}
}
}
@Override
public void okResponse(byte[] ok, AbstractService service) {
MySQLResponseService mysqlService = (MySQLResponseService) service;
boolean executeResponse = mysqlService.syncAndExecute();
if (executeResponse) {
synchronized (this) {
if (!write2Client) {
frontedConnection.getService().writeDirectly(ok);
write2Client = true;
PreparedClosePacket close = new PreparedClosePacket(originStatementId);
close.bufferWrite(service.getConnection());
((MySQLResponseService) service).release();
}
}
}
}
@Override
public void connectionClose(AbstractService service, String reason) {
rwSplitService.getConnection().close("backend connection is close in ps");
}
@Override
public void preparedExecuteResponse(byte[] header, List<byte[]> fields, byte[] eof, MySQLResponseService service) {
}
@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof, boolean isLeft, AbstractService service) {
}
@Override
public boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, AbstractService service) {
return false;
}
@Override
public void rowEofResponse(byte[] eof, boolean isLeft, AbstractService service) {
}
}

View File

@@ -0,0 +1,61 @@
package com.actiontech.dble.services.rwsplit;
public class PreparedStatementHolder {
private final byte[] prepareOrigin;
private final int paramsCount;
private byte[] executeOrigin;
private final boolean mustMaster;
private byte[] fieldType;
private boolean needAddFieldType;
private String prepareSql;
public PreparedStatementHolder(byte[] prepareOrigin, int paramsCount, boolean mustMaster, String sql) {
this.prepareOrigin = prepareOrigin;
this.paramsCount = paramsCount;
this.mustMaster = mustMaster;
this.prepareSql = sql;
}
public boolean isMustMaster() {
return mustMaster;
}
public byte[] getPrepareOrigin() {
return prepareOrigin;
}
public byte[] getExecuteOrigin() {
return executeOrigin;
}
public void setExecuteOrigin(byte[] executeOrigin) {
int nullBitMapSize = (paramsCount + 7) / 8;
byte newParameterBoundFlag = executeOrigin[14 + nullBitMapSize];
if (newParameterBoundFlag == (byte) 1) {
fieldType = new byte[2 * paramsCount];
System.arraycopy(executeOrigin, 15 + nullBitMapSize, fieldType, 0, 2 * paramsCount);
} else if (fieldType != null) {
needAddFieldType = true;
}
this.executeOrigin = executeOrigin;
}
public byte[] getFieldType() {
return fieldType;
}
// doesn't contain field type except first execute packet
public boolean isNeedAddFieldType() {
return needAddFieldType;
}
public int getParamsCount() {
return paramsCount;
}
public String getPrepareSql() {
return prepareSql;
}
}

View File

@@ -37,7 +37,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
/**
* If there are more packets next.This flag in would be set.
*/
private static final int HAS_MORE_RESULTS = 0x08;
private static final int HAS_MORE_RESULTS = 0x0008;
public RWSplitHandler(RWSplitService service, byte[] originPacket, Callback callback, boolean isHint) {
this.rwSplitService = service;

View File

@@ -1,6 +1,7 @@
package com.actiontech.dble.services.rwsplit;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.ByteUtil;
import com.actiontech.dble.backend.mysql.MySQLMessage;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
@@ -20,6 +21,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -40,6 +42,11 @@ public class RWSplitService extends BusinessService {
private final RWSplitQueryHandler queryHandler;
private final RWSplitNonBlockingSession session;
public static final int LOCK_READ = 2;
// prepare statement
private ConcurrentHashMap<Long, PreparedStatementHolder> psHolder = new ConcurrentHashMap<>();
public RWSplitService(AbstractConnection connection) {
super(connection);
this.session = new RWSplitNonBlockingSession(this);
@@ -123,11 +130,14 @@ public class RWSplitService extends BusinessService {
break;
case MySQLPacket.COM_STMT_CLOSE:
commands.doStmtClose();
session.execute(true, data, (isSuccess, resp, rwSplitService) -> {
rwSplitService.setInPrepare(false);
});
session.execute(true, data, null);
// COM_STMT_CLOSE No response is sent back to the client.
inPrepare = false;
long statementId = ByteUtil.readUB4(data, 5);
psHolder.remove(statementId);
session.unbindIfSafe();
break;
// connection
// connection
case MySQLPacket.COM_QUIT:
commands.doQuit();
session.close("quit cmd");
@@ -198,19 +208,42 @@ public class RWSplitService extends BusinessService {
try {
inPrepare = true;
String sql = mm.readString(getCharset().getClient());
if (sql.endsWith(";")) {
sql = sql.substring(0, sql.length() - 1).trim();
}
sql = sql.trim();
final String finalSql = sql;
int rs = ServerParse.parse(sql);
int sqlType = rs & 0xff;
switch (sqlType) {
case ServerParse.SELECT:
int rs2 = ServerParse.parseSpecial(sqlType, sql);
if (rs2 == ServerParse.SELECT_FOR_UPDATE || rs2 == ServerParse.LOCK_IN_SHARE_MODE) {
session.execute(true, data, null);
if (rs2 == LOCK_READ) {
session.execute(true, data, (isSuccess, resp, rwSplitService) -> {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql));
}
}, false);
} else {
session.execute(null, data, null);
session.execute(null, data, (isSuccess, resp, rwSplitService) -> {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, false, finalSql));
}
}, false);
}
break;
default:
session.execute(true, data, null);
session.execute(true, data, (isSuccess, resp, rwSplitService) -> {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql));
}
});
break;
}
} catch (IOException e) {
@@ -259,6 +292,7 @@ public class RWSplitService extends BusinessService {
public byte[] getExecuteSqlBytes() {
return executeSqlBytes;
}
public boolean isInPrepare() {
return inPrepare;
}
@@ -289,4 +323,14 @@ public class RWSplitService extends BusinessService {
session.close("clean up");
}
}
public PreparedStatementHolder getPrepareStatement(long id) {
return psHolder.get(id);
}
public ConcurrentHashMap<Long, PreparedStatementHolder> getPsHolder() {
return psHolder;
}
}