From 690d73669625db918d6497bc6a33fcd7ffb8b235 Mon Sep 17 00:00:00 2001 From: LUA Date: Mon, 24 Oct 2022 13:18:18 +0800 Subject: [PATCH] fix: close the connection that is being created and the creation time exceeds the connectionTimeout value --- .../backend/heartbeat/MySQLHeartbeat.java | 3 ++- .../dble/backend/pool/ConnectionPool.java | 27 ++++++++++++++----- .../dble/backend/pool/PoolBase.java | 6 +++-- .../provider/ConnectionPoolProvider.java | 8 ++++++ .../dble/net/connection/PooledConnection.java | 6 +++++ 5 files changed, 40 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index 4dc3c8e93..60ce13e41 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -224,7 +224,8 @@ public class MySQLHeartbeat { AlertUtil.alertResolve(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "mysql", this.source.getConfig().getId(), labels); } //after the heartbeat changes from failure to success, it needs to be expanded immediately - if (source.getTotalConnections() == 0) { + if (source.getTotalConnections() == 0 && !status.equals(MySQLHeartbeatStatus.INIT) && !status.equals(MySQLHeartbeatStatus.OK)) { + LOGGER.debug("[updatePoolCapacity] heartbeat to [{}] setOk, previous status is {}", source, status); source.updatePoolCapacity(); } if (isStop) { diff --git a/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java b/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java index 73a82f49f..03998d40c 100644 --- a/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java +++ b/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java @@ -74,9 +74,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } try { ConnectionPoolProvider.getConnGetFrenshLocekAfter(); + int waiting = waiters.get(); for (PooledConnection conn : allConnections) { if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { - newPooledEntry(schema, waiters.get()); + if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) { + newPooledEntry(schema, waiting, true); + } return conn; } } @@ -94,11 +97,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener try { final int waiting = waiterNum; ConnectionPoolProvider.getConnGetFrenshLocekAfter(); + ConnectionPoolProvider.borrowConnectionBefore(); for (PooledConnection conn : allConnections) { if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // If we may have stolen another waiter's connection, request another bag add. - if (waiting > 1) { - newPooledEntry(schema, waiting - 1); + if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) { + newPooledEntry(schema, waiting, true); } return conn; } @@ -106,14 +110,18 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener waiterNum = waiters.incrementAndGet(); try { - newPooledEntry(schema, waiterNum); + newPooledEntry(schema, waiterNum, true); + ConnectionPoolProvider.newConnectionAfter(); timeout = timeUnit.toNanos(timeout); do { final long start = System.nanoTime(); final PooledConnection bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { + if (bagEntry != null) { + bagEntry.getCreateByWaiter().set(false); + } return bagEntry; } @@ -129,7 +137,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } } - private void newPooledEntry(final String schema, final int waiting) { + private void newPooledEntry(final String schema, final int waiting, boolean createByWaiter) { if (instance.isDisabled() || isClosed.get()) { return; } @@ -138,7 +146,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener Map labels = AlertUtil.genSingleLabel("dbInstance", alertKey); if (waiting > 0) { if (totalConnections.incrementAndGet() <= config.getMaxCon()) { - newConnection(schema, ConnectionPool.this); + newConnection(schema, ConnectionPool.this, createByWaiter); if (ToResolveContainer.REACH_MAX_CON.contains(alertKey)) { AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", config.getId(), labels, ToResolveContainer.REACH_MAX_CON, alertKey); @@ -188,7 +196,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } for (int i = 0; i < connectionsToAdd; i++) { // newPooledEntry(schemas[i % schemas.length]); - newPooledEntry(null, 1); + newPooledEntry(null, 1, false); } } @@ -202,6 +210,8 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener return; } + LOGGER.debug("connection create success: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn); + conn.lazySet(STATE_NOT_IN_USE); // spin until a thread takes it or none are waiting while (waiters.get() > 0 && conn.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(conn)) { @@ -218,6 +228,9 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener @Override public void onCreateFail(PooledConnection conn, Throwable e) { if (conn == null || conn.getIsCreateFail().compareAndSet(false, true)) { + if (conn != null) { + LOGGER.debug("connection create fail: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn); + } LOGGER.warn("create connection fail " + e.getMessage()); totalConnections.decrementAndGet(); // conn can be null if newChannel crashed (eg SocketException("too many open files")) diff --git a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java index ec640f168..50e70525a 100644 --- a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java +++ b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java @@ -2,6 +2,7 @@ package com.actiontech.dble.backend.pool; import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; import com.actiontech.dble.config.model.db.DbInstanceConfig; +import com.actiontech.dble.net.connection.PooledConnection; import com.actiontech.dble.net.factory.PooledConnectionFactory; import java.io.IOException; @@ -31,9 +32,10 @@ public class PoolBase { } } - void newConnection(String schema, PooledConnectionListener listener) { + void newConnection(String schema, PooledConnectionListener listener, boolean createByWaiter) { try { - factory.make(instance, listener, schema); + PooledConnection pooledConnection = factory.make(instance, listener, schema); + pooledConnection.getCreateByWaiter().set(createByWaiter); } catch (IOException ioe) { listener.onCreateFail(null, ioe); } diff --git a/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java b/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java index eabe0c84c..eb5b75ea9 100644 --- a/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java +++ b/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java @@ -17,5 +17,13 @@ public final class ConnectionPoolProvider { } + public static void newConnectionAfter() { + + } + + public static void borrowConnectionBefore() { + + } + } diff --git a/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java b/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java index bcee08b32..d39bd9ba7 100644 --- a/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java +++ b/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java @@ -29,6 +29,8 @@ public abstract class PooledConnection extends AbstractConnection { public static final Comparator LAST_ACCESS_COMPARABLE; + private AtomicBoolean createByWaiter = new AtomicBoolean(false); + static { LAST_ACCESS_COMPARABLE = Comparator.comparingLong(entryOne -> entryOne.lastTime); } @@ -117,4 +119,8 @@ public abstract class PooledConnection extends AbstractConnection { public AtomicBoolean getIsCreateFail() { return isCreateFail; } + + public AtomicBoolean getCreateByWaiter() { + return createByWaiter; + } }