Merge pull request #3664 from actiontech/fix/1867-2010

fix: improve the connection pool to supplement the connection logic(cherry-pick)
This commit is contained in:
LUA
2023-04-18 13:38:24 +08:00
committed by GitHub
4 changed files with 38 additions and 9 deletions

View File

@@ -74,9 +74,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
try {
ConnectionPoolProvider.getConnGetFrenshLocekAfter();
int waiting = waiters.get();
for (PooledConnection conn : allConnections) {
if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
newPooledEntry(schema, waiters.get());
if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) {
newPooledEntry(schema, waiting, true);
}
return conn;
}
}
@@ -94,11 +97,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
try {
final int waiting = waiterNum;
ConnectionPoolProvider.getConnGetFrenshLocekAfter();
ConnectionPoolProvider.borrowConnectionBefore();
for (PooledConnection conn : allConnections) {
if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
newPooledEntry(schema, waiting - 1);
if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) {
newPooledEntry(schema, waiting, true);
}
return conn;
}
@@ -106,14 +110,18 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
waiterNum = waiters.incrementAndGet();
try {
newPooledEntry(schema, waiterNum);
newPooledEntry(schema, waiterNum, true);
ConnectionPoolProvider.newConnectionAfter();
timeout = timeUnit.toNanos(timeout);
do {
final long start = System.nanoTime();
final PooledConnection bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
if (bagEntry != null) {
bagEntry.getCreateByWaiter().set(false);
}
return bagEntry;
}
@@ -129,7 +137,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
}
private void newPooledEntry(final String schema, final int waiting) {
private void newPooledEntry(final String schema, final int waiting, boolean createByWaiter) {
if (instance.isDisabled() || isClosed.get()) {
return;
}
@@ -138,7 +146,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", alertKey);
if (waiting > 0) {
if (totalConnections.incrementAndGet() <= config.getMaxCon()) {
newConnection(schema, ConnectionPool.this);
newConnection(schema, ConnectionPool.this, createByWaiter);
if (ToResolveContainer.REACH_MAX_CON.contains(alertKey)) {
AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", config.getId(), labels,
ToResolveContainer.REACH_MAX_CON, alertKey);
@@ -187,7 +195,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
for (int i = 0; i < connectionsToAdd; i++) {
// newPooledEntry(schemas[i % schemas.length]);
newPooledEntry(null, 1);
newPooledEntry(null, 1, false);
}
}
@@ -201,6 +209,8 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
return;
}
LOGGER.debug("connection create success: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn);
conn.lazySet(STATE_NOT_IN_USE);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && conn.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(conn)) {
@@ -217,6 +227,9 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
@Override
public void onCreateFail(PooledConnection conn, Throwable e) {
if (conn == null || conn.getIsCreateFail().compareAndSet(false, true)) {
if (conn != null) {
LOGGER.debug("connection create fail: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn);
}
LOGGER.warn("create connection fail " + e.getMessage());
totalConnections.decrementAndGet();
// conn can be null if newChannel crashed (eg SocketException("too many open files"))

View File

@@ -2,6 +2,7 @@ package com.actiontech.dble.backend.pool;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.config.model.db.DbInstanceConfig;
import com.actiontech.dble.net.connection.PooledConnection;
import com.actiontech.dble.net.factory.PooledConnectionFactory;
import java.io.IOException;
@@ -31,9 +32,10 @@ public class PoolBase {
}
}
void newConnection(String schema, PooledConnectionListener listener) {
void newConnection(String schema, PooledConnectionListener listener, boolean createByWaiter) {
try {
factory.make(instance, listener, schema);
PooledConnection pooledConnection = factory.make(instance, listener, schema);
pooledConnection.getCreateByWaiter().set(createByWaiter);
} catch (IOException ioe) {
listener.onCreateFail(null, ioe);
}

View File

@@ -17,5 +17,13 @@ public final class ConnectionPoolProvider {
}
public static void newConnectionAfter() {
}
public static void borrowConnectionBefore() {
}
}

View File

@@ -31,6 +31,8 @@ public abstract class PooledConnection extends AbstractConnection {
protected long connectionTimeout;
private AtomicBoolean createByWaiter = new AtomicBoolean(false);
static {
LAST_ACCESS_COMPARABLE = Comparator.comparingLong(entryOne -> entryOne.lastTime);
}
@@ -119,4 +121,8 @@ public abstract class PooledConnection extends AbstractConnection {
public AtomicBoolean getIsCreateFail() {
return isCreateFail;
}
public AtomicBoolean getCreateByWaiter() {
return createByWaiter;
}
}