From 445a86db9bbc4f5fdb240fdbd839a88dac6743a6 Mon Sep 17 00:00:00 2001 From: LUA Date: Mon, 24 Oct 2022 13:18:18 +0800 Subject: [PATCH] fix: close the connection that is being created and the creation time exceeds the connectionTimeout value --- .../dble/backend/pool/ConnectionPool.java | 27 ++++++++++++++----- .../dble/backend/pool/PoolBase.java | 6 +++-- .../provider/ConnectionPoolProvider.java | 8 ++++++ .../dble/net/connection/PooledConnection.java | 6 +++++ 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java b/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java index 4e8ecefb0..982e96b33 100644 --- a/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java +++ b/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java @@ -74,9 +74,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } try { ConnectionPoolProvider.getConnGetFrenshLocekAfter(); + int waiting = waiters.get(); for (PooledConnection conn : allConnections) { if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { - newPooledEntry(schema, waiters.get()); + if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) { + newPooledEntry(schema, waiting, true); + } return conn; } } @@ -94,11 +97,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener try { final int waiting = waiterNum; ConnectionPoolProvider.getConnGetFrenshLocekAfter(); + ConnectionPoolProvider.borrowConnectionBefore(); for (PooledConnection conn : allConnections) { if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // If we may have stolen another waiter's connection, request another bag add. - if (waiting > 1) { - newPooledEntry(schema, waiting - 1); + if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) { + newPooledEntry(schema, waiting, true); } return conn; } @@ -106,14 +110,18 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener waiterNum = waiters.incrementAndGet(); try { - newPooledEntry(schema, waiterNum); + newPooledEntry(schema, waiterNum, true); + ConnectionPoolProvider.newConnectionAfter(); timeout = timeUnit.toNanos(timeout); do { final long start = System.nanoTime(); final PooledConnection bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { + if (bagEntry != null) { + bagEntry.getCreateByWaiter().set(false); + } return bagEntry; } @@ -129,7 +137,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } } - private void newPooledEntry(final String schema, final int waiting) { + private void newPooledEntry(final String schema, final int waiting, boolean createByWaiter) { if (instance.isDisabled() || isClosed.get()) { return; } @@ -138,7 +146,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener Map labels = AlertUtil.genSingleLabel("dbInstance", alertKey); if (waiting > 0) { if (totalConnections.incrementAndGet() <= config.getMaxCon()) { - newConnection(schema, ConnectionPool.this); + newConnection(schema, ConnectionPool.this, createByWaiter); if (ToResolveContainer.REACH_MAX_CON.contains(alertKey)) { AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", config.getId(), labels, ToResolveContainer.REACH_MAX_CON, alertKey); @@ -187,7 +195,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } for (int i = 0; i < connectionsToAdd; i++) { // newPooledEntry(schemas[i % schemas.length]); - newPooledEntry(null, 1); + newPooledEntry(null, 1, false); } } @@ -201,6 +209,8 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener return; } + LOGGER.debug("connection create success: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn); + conn.lazySet(STATE_NOT_IN_USE); // spin until a thread takes it or none are waiting while (waiters.get() > 0 && conn.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(conn)) { @@ -217,6 +227,9 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener @Override public void onCreateFail(PooledConnection conn, Throwable e) { if (conn == null || conn.getIsCreateFail().compareAndSet(false, true)) { + if (conn != null) { + LOGGER.debug("connection create fail: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn); + } LOGGER.warn("create connection fail " + e.getMessage()); totalConnections.decrementAndGet(); // conn can be null if newChannel crashed (eg SocketException("too many open files")) diff --git a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java index ec640f168..50e70525a 100644 --- a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java +++ b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java @@ -2,6 +2,7 @@ package com.actiontech.dble.backend.pool; import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; import com.actiontech.dble.config.model.db.DbInstanceConfig; +import com.actiontech.dble.net.connection.PooledConnection; import com.actiontech.dble.net.factory.PooledConnectionFactory; import java.io.IOException; @@ -31,9 +32,10 @@ public class PoolBase { } } - void newConnection(String schema, PooledConnectionListener listener) { + void newConnection(String schema, PooledConnectionListener listener, boolean createByWaiter) { try { - factory.make(instance, listener, schema); + PooledConnection pooledConnection = factory.make(instance, listener, schema); + pooledConnection.getCreateByWaiter().set(createByWaiter); } catch (IOException ioe) { listener.onCreateFail(null, ioe); } diff --git a/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java b/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java index eabe0c84c..eb5b75ea9 100644 --- a/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java +++ b/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java @@ -17,5 +17,13 @@ public final class ConnectionPoolProvider { } + public static void newConnectionAfter() { + + } + + public static void borrowConnectionBefore() { + + } + } diff --git a/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java b/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java index 14c95eb9d..587ba8369 100644 --- a/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java +++ b/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java @@ -31,6 +31,8 @@ public abstract class PooledConnection extends AbstractConnection { protected long connectionTimeout; + private AtomicBoolean createByWaiter = new AtomicBoolean(false); + static { LAST_ACCESS_COMPARABLE = Comparator.comparingLong(entryOne -> entryOne.lastTime); } @@ -119,4 +121,8 @@ public abstract class PooledConnection extends AbstractConnection { public AtomicBoolean getIsCreateFail() { return isCreateFail; } + + public AtomicBoolean getCreateByWaiter() { + return createByWaiter; + } }