inner-1972:fix gmssl (#3465)

Signed-off-by: dcy10000 <dcy10000@gmail.com>

(cherry picked from commit 822336d796)
Signed-off-by: dcy10000 <dcy10000@gmail.com>
This commit is contained in:
Rico
2022-11-11 13:27:15 +08:00
committed by dcy10000
parent 0a1d616ac3
commit 46d36e6ffd
8 changed files with 58 additions and 42 deletions

View File

@@ -65,7 +65,7 @@ public abstract class AbstractConnection implements Connection {
protected volatile IOProcessor processor;
protected volatile String closeReason;
private volatile ByteBuffer readBuffer;
private volatile ByteBuffer bottomReadBuffer;
protected volatile boolean frontWriteFlowControlled = false;
protected int readBufferChunk;
protected final long startupTime;
@@ -316,7 +316,7 @@ public abstract class AbstractConnection implements Connection {
}
buffer.limit(buffer.position());
buffer.position(offset);
this.setReadBuffer(buffer.compact());
this.setBottomReadBuffer(buffer.compact());
}
public void ensureFreeSpaceOfReadBuffer(ByteBuffer buffer,
@@ -326,7 +326,7 @@ public abstract class AbstractConnection implements Connection {
lastLargeMessageTime = TimeUtil.currentTimeMillis();
buffer.position(offset);
newBuffer.put(buffer);
setReadBuffer(newBuffer);
setBottomReadBuffer(newBuffer);
recycle(buffer);
} else {
if (offset != 0) {
@@ -342,14 +342,14 @@ public abstract class AbstractConnection implements Connection {
// if cur buffer is temper none direct byte buffer and not
// received large message in recent 30 seconds
// then change to direct buffer for performance
ByteBuffer localReadBuffer = this.getReadBuffer();
ByteBuffer localReadBuffer = this.getBottomReadBuffer();
if (localReadBuffer != null && !localReadBuffer.isDirect() &&
lastLargeMessageTime < lastReadTime - 30 * 1000L) { // used temp heap
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("change to direct con read buffer ,cur temp buf size :" + localReadBuffer.capacity());
}
recycle(localReadBuffer);
this.setReadBuffer(processor.getBufferPool().allocate(readBufferChunk));
this.setBottomReadBuffer(processor.getBufferPool().allocate(readBufferChunk));
} else {
if (localReadBuffer != null) {
IODelayProvider.inReadReachEnd();
@@ -502,21 +502,21 @@ public abstract class AbstractConnection implements Connection {
return isClosed.get();
}
public ByteBuffer findNetReadBuffer() {
ByteBuffer tmpReadBuffer = getReadBuffer();
public ByteBuffer findReadBuffer() {
ByteBuffer tmpReadBuffer = getBottomReadBuffer();
if (tmpReadBuffer == null) {
tmpReadBuffer = processor.getBufferPool().allocate(processor.getBufferPool().getChunkSize());
setReadBuffer(tmpReadBuffer);
setBottomReadBuffer(tmpReadBuffer);
}
return tmpReadBuffer;
}
public synchronized void recycleReadBuffer() {
final ByteBuffer tmpReadBuffer = getReadBuffer();
final ByteBuffer tmpReadBuffer = getBottomReadBuffer();
if (tmpReadBuffer != null) {
this.recycle(tmpReadBuffer);
this.setReadBuffer(null);
this.setBottomReadBuffer(null);
}
}
@@ -528,9 +528,9 @@ public abstract class AbstractConnection implements Connection {
}
public synchronized void baseCleanup(String reason) {
if (getReadBuffer() != null) {
this.recycle(getReadBuffer());
this.setReadBuffer(null);
if (getBottomReadBuffer() != null) {
this.recycle(getBottomReadBuffer());
this.setBottomReadBuffer(null);
}
if (service != null && !service.isFakeClosed()) {
@@ -606,6 +606,16 @@ public abstract class AbstractConnection implements Connection {
}
public final ByteBuffer findBottomReadBuffer() {
ByteBuffer tmpReadBuffer = getBottomReadBuffer();
if (tmpReadBuffer == null) {
tmpReadBuffer = allocate(processor.getBufferPool().getChunkSize());
setBottomReadBuffer(tmpReadBuffer);
}
return tmpReadBuffer;
}
/**
* heartbeat of SLB/LVS only create an tcp connection and then close it immediately without any data write to dble .(send reset)
*
@@ -616,6 +626,10 @@ public abstract class AbstractConnection implements Connection {
return tmpService != null && tmpService instanceof MySQLFrontAuthService && ((MySQLFrontAuthService) tmpService).haveNotReceivedMessage();
}
ByteBuffer getReadBuffer() {
return bottomReadBuffer;
}
public abstract void setProcessor(IOProcessor processor);
public void setId(long id) {
@@ -670,8 +684,8 @@ public abstract class AbstractConnection implements Connection {
this.readBufferChunk = readBufferChunk;
}
public ByteBuffer getReadBuffer() {
return this.readBuffer;
public ByteBuffer getBottomReadBuffer() {
return this.bottomReadBuffer;
}
public String getCloseReason() {
@@ -714,7 +728,7 @@ public abstract class AbstractConnection implements Connection {
this.proto = proto;
}
public void setReadBuffer(ByteBuffer readBuffer) {
this.readBuffer = readBuffer;
public void setBottomReadBuffer(ByteBuffer bottomReadBuffer) {
this.bottomReadBuffer = bottomReadBuffer;
}
}

View File

@@ -108,14 +108,14 @@ public class FrontendConnection extends AbstractConnection {
handleSSLData(dataBuffer);
} else {
transferToReadBuffer(dataBuffer);
parentHandle(getReadBuffer());
parentHandle(getBottomReadBuffer());
}
}
private void transferToReadBuffer(ByteBuffer dataBuffer) {
if (!isSupportSSL) return;
dataBuffer.flip();
ByteBuffer readBuffer = findReadBuffer();
ByteBuffer readBuffer = findBottomReadBuffer();
int len = readBuffer.position() + dataBuffer.limit();
if (readBuffer.capacity() < len) {
readBuffer = ensureReadBufferFree(readBuffer, len);
@@ -142,7 +142,7 @@ public class FrontendConnection extends AbstractConnection {
case SSL_CLOSE_PACKET:
if (!result.isHasMorePacket()) {
netReadReachEnd();
final ByteBuffer tmpReadBuffer = getReadBuffer();
final ByteBuffer tmpReadBuffer = getBottomReadBuffer();
if (tmpReadBuffer != null) {
tmpReadBuffer.clear();
}
@@ -212,7 +212,7 @@ public class FrontendConnection extends AbstractConnection {
if (packetData == null)
return;
sslHandler.unwrapAppData(packetData);
parentHandle(getReadBuffer());
parentHandle(getBottomReadBuffer());
}
public void processSSLPacketNotBigEnough(ByteBuffer buffer, int offset, final int pkgLength) {
@@ -315,25 +315,16 @@ public class FrontendConnection extends AbstractConnection {
} else {
dataBuffer.limit(dataBuffer.position());
dataBuffer.position(offset);
setReadBuffer(dataBuffer.compact());
setBottomReadBuffer(dataBuffer.compact());
}
}
public ByteBuffer findReadBuffer() {
ByteBuffer tmpReadBuffer = getReadBuffer();
if (tmpReadBuffer == null) {
tmpReadBuffer = processor.getBufferPool().allocate(processor.getBufferPool().getChunkSize());
setReadBuffer(tmpReadBuffer);
}
return tmpReadBuffer;
}
public ByteBuffer ensureReadBufferFree(ByteBuffer oldBuffer, int expectSize) {
ByteBuffer newBuffer = processor.getBufferPool().allocate(expectSize < 0 ? processor.getBufferPool().getChunkSize() : expectSize);
oldBuffer.flip();
newBuffer.put(oldBuffer);
setReadBuffer(newBuffer);
setBottomReadBuffer(newBuffer);
oldBuffer.clear();
recycle(oldBuffer);
@@ -355,17 +346,28 @@ public class FrontendConnection extends AbstractConnection {
}
}
public ByteBuffer findNetReadBuffer() {
@Override
public ByteBuffer findReadBuffer() {
if (isSupportSSL) {
if (this.netReadBuffer == null) {
netReadBuffer = processor.getBufferPool().allocate(processor.getBufferPool().getChunkSize());
}
return netReadBuffer;
} else {
return super.findNetReadBuffer();
return super.findReadBuffer();
}
}
@Override
ByteBuffer getReadBuffer() {
if (isSupportSSL) {
return netReadBuffer;
} else {
return super.getReadBuffer();
}
}
public boolean isManager() {
return isManager;
}

View File

@@ -129,7 +129,7 @@ public class SSLHandler {
private SSLEngineResult unwrap(SSLEngine engine0, ByteBuffer in) throws SSLException {
int overflows = 0;
ByteBuffer outBuffer = con.findReadBuffer();
ByteBuffer outBuffer = con.findBottomReadBuffer();
for (; ; ) {
SSLEngineResult result = engine0.unwrap(in, outBuffer);
switch (result.getStatus()) {

View File

@@ -45,7 +45,7 @@ public class AIOSocketWR extends SocketWR {
throw new IOException("read from closed channel cause error");
}
try {
ByteBuffer theBuffer = con.findNetReadBuffer();
ByteBuffer theBuffer = con.findReadBuffer();
if (theBuffer.hasRemaining()) {
channel.read(theBuffer, this, AIO_READ_HANDLER);
} else {
@@ -53,7 +53,7 @@ public class AIOSocketWR extends SocketWR {
}
} finally {
//prevent asyncClose and read operation happened Concurrently.
if (con.isClosed() && con.getReadBuffer() != null) {
if (con.isClosed() && con.getBottomReadBuffer() != null) {
con.recycleReadBuffer();
}
}

View File

@@ -349,12 +349,12 @@ public class NIOSocketWR extends SocketWR {
throw new IOException("read from closed channel cause error");
}
try {
ByteBuffer theBuffer = con.findNetReadBuffer();
ByteBuffer theBuffer = con.findReadBuffer();
int got = channel.read(theBuffer);
con.onReadData(got);
} finally {
//prevent asyncClose and read operation happened Concurrently.
if (con.isClosed() && con.getReadBuffer() != null) {
if (con.isClosed() && con.getBottomReadBuffer() != null) {
con.recycleReadBuffer();
}
}

View File

@@ -145,7 +145,7 @@ public final class DbleBackendConnections extends ManagerBaseTable {
}
Optional.ofNullable(service.getSession()).ifPresent(session -> row.put("session_conn_id", session.getSource().getId() + ""));
row.put("conn_estab_time", ((TimeUtil.currentTimeMillis() - c.getStartupTime()) / 1000) + "");
ByteBuffer bb = c.getReadBuffer();
ByteBuffer bb = c.getBottomReadBuffer();
row.put("conn_recv_buffer", (bb == null ? 0 : bb.capacity()) + "");
row.put("conn_send_task_queue", c.getWriteQueue().size() + "");

View File

@@ -157,7 +157,7 @@ public final class DbleFrontConnections extends ManagerBaseTable {
row.put("conn_net_in", c.getNetInBytes() + "");
row.put("conn_net_out", c.getNetOutBytes() + "");
row.put("conn_estab_time", ((TimeUtil.currentTimeMillis() - c.getStartupTime()) / 1000) + "");
ByteBuffer bb = c.getReadBuffer();
ByteBuffer bb = c.getBottomReadBuffer();
row.put("conn_recv_buffer", (bb == null ? 0 : bb.capacity()) + "");
row.put("conn_send_task_queue", c.getWriteQueue().size() + "");
row.put("conn_recv_task_queue", c.getFrontEndService().getRecvTaskQueueSize() + "");

View File

@@ -236,7 +236,7 @@ public final class ShowConnection {
row.add(LongUtil.toBytes(c.getNetInBytes()));
row.add(LongUtil.toBytes(c.getNetOutBytes()));
row.add(LongUtil.toBytes((TimeUtil.currentTimeMillis() - c.getStartupTime()) / 1000L));
ByteBuffer bb = c.getReadBuffer();
ByteBuffer bb = c.getBottomReadBuffer();
row.add(IntegerUtil.toBytes(bb == null ? 0 : bb.capacity()));
row.add(IntegerUtil.toBytes(c.getWriteQueue().size()));
row.add(IntegerUtil.toBytes(service.getRecvTaskQueueSize()));