From 46d36e6ffdeec951c6d093f1a31d1eaeb04afb89 Mon Sep 17 00:00:00 2001 From: Rico Date: Fri, 11 Nov 2022 13:27:15 +0800 Subject: [PATCH] inner-1972:fix gmssl (#3465) Signed-off-by: dcy10000 (cherry picked from commit 822336d796d9d23f0aad48a38d6f13ac19cb0360) Signed-off-by: dcy10000 --- .../net/connection/AbstractConnection.java | 48 ++++++++++++------- .../net/connection/FrontendConnection.java | 36 +++++++------- .../dble/net/connection/SSLHandler.java | 2 +- .../dble/net/impl/aio/AIOSocketWR.java | 4 +- .../dble/net/impl/nio/NIOSocketWR.java | 4 +- .../tables/DbleBackendConnections.java | 2 +- .../tables/DbleFrontConnections.java | 2 +- .../manager/response/ShowConnection.java | 2 +- 8 files changed, 58 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java b/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java index a5cb5d0aa..c67c08361 100644 --- a/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java +++ b/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java @@ -65,7 +65,7 @@ public abstract class AbstractConnection implements Connection { protected volatile IOProcessor processor; protected volatile String closeReason; - private volatile ByteBuffer readBuffer; + private volatile ByteBuffer bottomReadBuffer; protected volatile boolean frontWriteFlowControlled = false; protected int readBufferChunk; protected final long startupTime; @@ -316,7 +316,7 @@ public abstract class AbstractConnection implements Connection { } buffer.limit(buffer.position()); buffer.position(offset); - this.setReadBuffer(buffer.compact()); + this.setBottomReadBuffer(buffer.compact()); } public void ensureFreeSpaceOfReadBuffer(ByteBuffer buffer, @@ -326,7 +326,7 @@ public abstract class AbstractConnection implements Connection { lastLargeMessageTime = TimeUtil.currentTimeMillis(); buffer.position(offset); newBuffer.put(buffer); - setReadBuffer(newBuffer); + setBottomReadBuffer(newBuffer); recycle(buffer); } else { if (offset != 0) { @@ -342,14 +342,14 @@ public abstract class AbstractConnection implements Connection { // if cur buffer is temper none direct byte buffer and not // received large message in recent 30 seconds // then change to direct buffer for performance - ByteBuffer localReadBuffer = this.getReadBuffer(); + ByteBuffer localReadBuffer = this.getBottomReadBuffer(); if (localReadBuffer != null && !localReadBuffer.isDirect() && lastLargeMessageTime < lastReadTime - 30 * 1000L) { // used temp heap if (LOGGER.isDebugEnabled()) { LOGGER.debug("change to direct con read buffer ,cur temp buf size :" + localReadBuffer.capacity()); } recycle(localReadBuffer); - this.setReadBuffer(processor.getBufferPool().allocate(readBufferChunk)); + this.setBottomReadBuffer(processor.getBufferPool().allocate(readBufferChunk)); } else { if (localReadBuffer != null) { IODelayProvider.inReadReachEnd(); @@ -502,21 +502,21 @@ public abstract class AbstractConnection implements Connection { return isClosed.get(); } - public ByteBuffer findNetReadBuffer() { - ByteBuffer tmpReadBuffer = getReadBuffer(); + public ByteBuffer findReadBuffer() { + ByteBuffer tmpReadBuffer = getBottomReadBuffer(); if (tmpReadBuffer == null) { tmpReadBuffer = processor.getBufferPool().allocate(processor.getBufferPool().getChunkSize()); - setReadBuffer(tmpReadBuffer); + setBottomReadBuffer(tmpReadBuffer); } return tmpReadBuffer; } public synchronized void recycleReadBuffer() { - final ByteBuffer tmpReadBuffer = getReadBuffer(); + final ByteBuffer tmpReadBuffer = getBottomReadBuffer(); if (tmpReadBuffer != null) { this.recycle(tmpReadBuffer); - this.setReadBuffer(null); + this.setBottomReadBuffer(null); } } @@ -528,9 +528,9 @@ public abstract class AbstractConnection implements Connection { } public synchronized void baseCleanup(String reason) { - if (getReadBuffer() != null) { - this.recycle(getReadBuffer()); - this.setReadBuffer(null); + if (getBottomReadBuffer() != null) { + this.recycle(getBottomReadBuffer()); + this.setBottomReadBuffer(null); } if (service != null && !service.isFakeClosed()) { @@ -606,6 +606,16 @@ public abstract class AbstractConnection implements Connection { } + public final ByteBuffer findBottomReadBuffer() { + ByteBuffer tmpReadBuffer = getBottomReadBuffer(); + if (tmpReadBuffer == null) { + tmpReadBuffer = allocate(processor.getBufferPool().getChunkSize()); + setBottomReadBuffer(tmpReadBuffer); + } + return tmpReadBuffer; + } + + /** * heartbeat of SLB/LVS only create an tcp connection and then close it immediately without any data write to dble .(send reset) * @@ -616,6 +626,10 @@ public abstract class AbstractConnection implements Connection { return tmpService != null && tmpService instanceof MySQLFrontAuthService && ((MySQLFrontAuthService) tmpService).haveNotReceivedMessage(); } + ByteBuffer getReadBuffer() { + return bottomReadBuffer; + } + public abstract void setProcessor(IOProcessor processor); public void setId(long id) { @@ -670,8 +684,8 @@ public abstract class AbstractConnection implements Connection { this.readBufferChunk = readBufferChunk; } - public ByteBuffer getReadBuffer() { - return this.readBuffer; + public ByteBuffer getBottomReadBuffer() { + return this.bottomReadBuffer; } public String getCloseReason() { @@ -714,7 +728,7 @@ public abstract class AbstractConnection implements Connection { this.proto = proto; } - public void setReadBuffer(ByteBuffer readBuffer) { - this.readBuffer = readBuffer; + public void setBottomReadBuffer(ByteBuffer bottomReadBuffer) { + this.bottomReadBuffer = bottomReadBuffer; } } diff --git a/src/main/java/com/actiontech/dble/net/connection/FrontendConnection.java b/src/main/java/com/actiontech/dble/net/connection/FrontendConnection.java index 4ffe3166f..44a6946fb 100644 --- a/src/main/java/com/actiontech/dble/net/connection/FrontendConnection.java +++ b/src/main/java/com/actiontech/dble/net/connection/FrontendConnection.java @@ -108,14 +108,14 @@ public class FrontendConnection extends AbstractConnection { handleSSLData(dataBuffer); } else { transferToReadBuffer(dataBuffer); - parentHandle(getReadBuffer()); + parentHandle(getBottomReadBuffer()); } } private void transferToReadBuffer(ByteBuffer dataBuffer) { if (!isSupportSSL) return; dataBuffer.flip(); - ByteBuffer readBuffer = findReadBuffer(); + ByteBuffer readBuffer = findBottomReadBuffer(); int len = readBuffer.position() + dataBuffer.limit(); if (readBuffer.capacity() < len) { readBuffer = ensureReadBufferFree(readBuffer, len); @@ -142,7 +142,7 @@ public class FrontendConnection extends AbstractConnection { case SSL_CLOSE_PACKET: if (!result.isHasMorePacket()) { netReadReachEnd(); - final ByteBuffer tmpReadBuffer = getReadBuffer(); + final ByteBuffer tmpReadBuffer = getBottomReadBuffer(); if (tmpReadBuffer != null) { tmpReadBuffer.clear(); } @@ -212,7 +212,7 @@ public class FrontendConnection extends AbstractConnection { if (packetData == null) return; sslHandler.unwrapAppData(packetData); - parentHandle(getReadBuffer()); + parentHandle(getBottomReadBuffer()); } public void processSSLPacketNotBigEnough(ByteBuffer buffer, int offset, final int pkgLength) { @@ -315,25 +315,16 @@ public class FrontendConnection extends AbstractConnection { } else { dataBuffer.limit(dataBuffer.position()); dataBuffer.position(offset); - setReadBuffer(dataBuffer.compact()); + setBottomReadBuffer(dataBuffer.compact()); } } - public ByteBuffer findReadBuffer() { - ByteBuffer tmpReadBuffer = getReadBuffer(); - if (tmpReadBuffer == null) { - tmpReadBuffer = processor.getBufferPool().allocate(processor.getBufferPool().getChunkSize()); - setReadBuffer(tmpReadBuffer); - } - return tmpReadBuffer; - } - public ByteBuffer ensureReadBufferFree(ByteBuffer oldBuffer, int expectSize) { ByteBuffer newBuffer = processor.getBufferPool().allocate(expectSize < 0 ? processor.getBufferPool().getChunkSize() : expectSize); oldBuffer.flip(); newBuffer.put(oldBuffer); - setReadBuffer(newBuffer); + setBottomReadBuffer(newBuffer); oldBuffer.clear(); recycle(oldBuffer); @@ -355,17 +346,28 @@ public class FrontendConnection extends AbstractConnection { } } - public ByteBuffer findNetReadBuffer() { + @Override + public ByteBuffer findReadBuffer() { if (isSupportSSL) { if (this.netReadBuffer == null) { netReadBuffer = processor.getBufferPool().allocate(processor.getBufferPool().getChunkSize()); } return netReadBuffer; } else { - return super.findNetReadBuffer(); + return super.findReadBuffer(); } } + @Override + ByteBuffer getReadBuffer() { + if (isSupportSSL) { + return netReadBuffer; + } else { + return super.getReadBuffer(); + } + } + + public boolean isManager() { return isManager; } diff --git a/src/main/java/com/actiontech/dble/net/connection/SSLHandler.java b/src/main/java/com/actiontech/dble/net/connection/SSLHandler.java index 6d543ff44..a8f85aa5b 100644 --- a/src/main/java/com/actiontech/dble/net/connection/SSLHandler.java +++ b/src/main/java/com/actiontech/dble/net/connection/SSLHandler.java @@ -129,7 +129,7 @@ public class SSLHandler { private SSLEngineResult unwrap(SSLEngine engine0, ByteBuffer in) throws SSLException { int overflows = 0; - ByteBuffer outBuffer = con.findReadBuffer(); + ByteBuffer outBuffer = con.findBottomReadBuffer(); for (; ; ) { SSLEngineResult result = engine0.unwrap(in, outBuffer); switch (result.getStatus()) { diff --git a/src/main/java/com/actiontech/dble/net/impl/aio/AIOSocketWR.java b/src/main/java/com/actiontech/dble/net/impl/aio/AIOSocketWR.java index 65e2ecde9..3247040a3 100644 --- a/src/main/java/com/actiontech/dble/net/impl/aio/AIOSocketWR.java +++ b/src/main/java/com/actiontech/dble/net/impl/aio/AIOSocketWR.java @@ -45,7 +45,7 @@ public class AIOSocketWR extends SocketWR { throw new IOException("read from closed channel cause error"); } try { - ByteBuffer theBuffer = con.findNetReadBuffer(); + ByteBuffer theBuffer = con.findReadBuffer(); if (theBuffer.hasRemaining()) { channel.read(theBuffer, this, AIO_READ_HANDLER); } else { @@ -53,7 +53,7 @@ public class AIOSocketWR extends SocketWR { } } finally { //prevent asyncClose and read operation happened Concurrently. - if (con.isClosed() && con.getReadBuffer() != null) { + if (con.isClosed() && con.getBottomReadBuffer() != null) { con.recycleReadBuffer(); } } diff --git a/src/main/java/com/actiontech/dble/net/impl/nio/NIOSocketWR.java b/src/main/java/com/actiontech/dble/net/impl/nio/NIOSocketWR.java index 667a487de..19795d6f2 100644 --- a/src/main/java/com/actiontech/dble/net/impl/nio/NIOSocketWR.java +++ b/src/main/java/com/actiontech/dble/net/impl/nio/NIOSocketWR.java @@ -349,12 +349,12 @@ public class NIOSocketWR extends SocketWR { throw new IOException("read from closed channel cause error"); } try { - ByteBuffer theBuffer = con.findNetReadBuffer(); + ByteBuffer theBuffer = con.findReadBuffer(); int got = channel.read(theBuffer); con.onReadData(got); } finally { //prevent asyncClose and read operation happened Concurrently. - if (con.isClosed() && con.getReadBuffer() != null) { + if (con.isClosed() && con.getBottomReadBuffer() != null) { con.recycleReadBuffer(); } } diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleBackendConnections.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleBackendConnections.java index 06dace2da..92687bf31 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleBackendConnections.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleBackendConnections.java @@ -145,7 +145,7 @@ public final class DbleBackendConnections extends ManagerBaseTable { } Optional.ofNullable(service.getSession()).ifPresent(session -> row.put("session_conn_id", session.getSource().getId() + "")); row.put("conn_estab_time", ((TimeUtil.currentTimeMillis() - c.getStartupTime()) / 1000) + ""); - ByteBuffer bb = c.getReadBuffer(); + ByteBuffer bb = c.getBottomReadBuffer(); row.put("conn_recv_buffer", (bb == null ? 0 : bb.capacity()) + ""); row.put("conn_send_task_queue", c.getWriteQueue().size() + ""); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFrontConnections.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFrontConnections.java index da2ee2a3a..423474ea8 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFrontConnections.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFrontConnections.java @@ -157,7 +157,7 @@ public final class DbleFrontConnections extends ManagerBaseTable { row.put("conn_net_in", c.getNetInBytes() + ""); row.put("conn_net_out", c.getNetOutBytes() + ""); row.put("conn_estab_time", ((TimeUtil.currentTimeMillis() - c.getStartupTime()) / 1000) + ""); - ByteBuffer bb = c.getReadBuffer(); + ByteBuffer bb = c.getBottomReadBuffer(); row.put("conn_recv_buffer", (bb == null ? 0 : bb.capacity()) + ""); row.put("conn_send_task_queue", c.getWriteQueue().size() + ""); row.put("conn_recv_task_queue", c.getFrontEndService().getRecvTaskQueueSize() + ""); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowConnection.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowConnection.java index 3b0a2cde1..ea57cf1f2 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowConnection.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowConnection.java @@ -236,7 +236,7 @@ public final class ShowConnection { row.add(LongUtil.toBytes(c.getNetInBytes())); row.add(LongUtil.toBytes(c.getNetOutBytes())); row.add(LongUtil.toBytes((TimeUtil.currentTimeMillis() - c.getStartupTime()) / 1000L)); - ByteBuffer bb = c.getReadBuffer(); + ByteBuffer bb = c.getBottomReadBuffer(); row.add(IntegerUtil.toBytes(bb == null ? 0 : bb.capacity())); row.add(IntegerUtil.toBytes(c.getWriteQueue().size())); row.add(IntegerUtil.toBytes(service.getRecvTaskQueueSize()));