idleList = new ArrayList<>(allConnections.size());
+ for (final BackendConnection entry : allConnections) {
+ if (entry.getState() == STATE_NOT_IN_USE) {
+ idleList.add(entry);
+ }
+ }
+
+ int removable = idleList.size() - config.getMinCon();
+
+ // Sort pool entries on lastAccessed
+ idleList.sort(LAST_ACCESS_COMPARABLE);
+
+ logPoolState("before cleanup ");
+ for (BackendConnection conn : idleList) {
+ if (removable > 0 && System.currentTimeMillis() - conn.getLastTime() > getIdleTimeout() &&
+ conn.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED)) {
+ conn.close("connection has passed idleTimeout");
+ removable--;
+ } else if (getTestWhileIdle() && conn.compareAndSet(STATE_NOT_IN_USE, STATE_HEARTBEAT)) {
+ ConnectionHeartBeatHandler heartBeatHandler = new ConnectionHeartBeatHandler(conn, false, this);
+ heartBeatHandler.ping(getConnectionHeartbeatTimeout());
+ }
+ }
+
+ }
+
+ public final int getThreadsAwaitingConnection() {
+ return synchronizer.getQueueLength();
+ }
+
+ /**
+ * Starts the evictor with the given delay. If there is an evictor
+ * running when this method is called, it is stopped and replaced with a
+ * new evictor with the specified delay.
+ *
+ * This method needs to be final, since it is called from a constructor.
+ * See POOL-195.
+ */
+ public void startEvictor() {
+ if (evictor != null) {
+ EvictionTimer.cancel(evictor, getEvictorShutdownTimeoutMillis(), TimeUnit.MILLISECONDS);
+ }
+ evictor = new Evictor();
+ EvictionTimer.schedule(evictor, 0, getTimeBetweenEvictionRunsMillis());
+ }
+
+ /**
+ * Stops the evictor.
+ */
+ public void stopEvictor() {
+ EvictionTimer.cancel(evictor, getEvictorShutdownTimeoutMillis(), TimeUnit.MILLISECONDS);
+ evictor = null;
+ }
+
+ /**
+ * Log the current pool state at debug level.
+ *
+ * @param prefix an optional prefix to prepend the log message
+ */
+ private void logPoolState(String... prefix) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} db instance[{}] stats (total={}, active={}, idle={}, idleTest={} waiting={})", new Object[]{(prefix.length > 0 ? prefix[0] : ""), config.getInstanceName(),
+ allConnections.size() - getCount(STATE_REMOVED), getCount(STATE_IN_USE), getCount(STATE_NOT_IN_USE), getCount(STATE_HEARTBEAT), getThreadsAwaitingConnection()});
+ }
+ }
+
+ /**
+ * The idle object evictor.
+ */
+ class Evictor implements Runnable {
+
+ private ScheduledFuture> scheduledFuture;
+
+ /**
+ * Run pool maintenance. Evict objects qualifying for eviction and then
+ * ensure that the minimum number of idle instances are available.
+ * Since the Timer that invokes Evictors is shared for all Pools but
+ * pools may exist in different class loaders, the Evictor ensures that
+ * any actions taken are under the class loader of the factory
+ * associated with the pool.
+ */
+ @Override
+ public void run() {
+
+ if (!instance.isAlive()) {
+ return;
+ }
+
+ final ClassLoader savedClassLoader =
+ Thread.currentThread().getContextClassLoader();
+ try {
+ if (factoryClassLoader != null) {
+ // Set the class loader for the factory
+ final ClassLoader cl = factoryClassLoader.get();
+ if (cl == null) {
+ // The pool has been dereferenced and the class loader
+ // GC'd. Cancel this timer so the pool can be GC'd as
+ // well.
+ cancel();
+ return;
+ }
+ Thread.currentThread().setContextClassLoader(cl);
+ }
+
+ // Evict from the pool
+ evict();
+
+ // Try to maintain minimum connections
+ fillPool();
+ } finally {
+ // Restore the previous CCL
+ Thread.currentThread().setContextClassLoader(savedClassLoader);
+ }
+ }
+
+ /**
+ * Sets the scheduled future.
+ *
+ * @param scheduledFuture the scheduled future.
+ */
+ void setScheduledFuture(final ScheduledFuture> scheduledFuture) {
+ this.scheduledFuture = scheduledFuture;
+ }
+
+ /**
+ * Cancels the scheduled future.
+ */
+ void cancel() {
+ scheduledFuture.cancel(false);
+ }
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/backend/pool/EvictionTimer.java b/src/main/java/com/actiontech/dble/backend/pool/EvictionTimer.java
new file mode 100644
index 000000000..96e629e78
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/pool/EvictionTimer.java
@@ -0,0 +1,101 @@
+package com.actiontech.dble.backend.pool;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+final class EvictionTimer {
+
+ /**
+ * Executor instance
+ */
+ private static ScheduledThreadPoolExecutor executor; //@GuardedBy("EvictionTimer.class")
+
+ /**
+ * Prevents instantiation
+ */
+ private EvictionTimer() {
+ // Hide the default constructor
+ }
+
+
+ /**
+ * @since 2.4.3
+ */
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("EvictionTimer []");
+ return builder.toString();
+ }
+
+
+ /**
+ * Adds the specified eviction task to the timer. Tasks that are added with a
+ * call to this method *must* call {@link #cancel()} to cancel the
+ * task to prevent memory and/or thread leaks in application server
+ * environments.
+ *
+ * @param task Task to be scheduled.
+ * @param delay Delay in milliseconds before task is executed.
+ * @param period Time in milliseconds between executions.
+ */
+ static synchronized void schedule(
+ final ConnectionPool.Evictor task, final long delay, final long period) {
+ if (null == executor) {
+ executor = new ScheduledThreadPoolExecutor(1, new EvictorThreadFactory());
+ executor.setRemoveOnCancelPolicy(true);
+ }
+ final ScheduledFuture> scheduledFuture =
+ executor.scheduleWithFixedDelay(task, delay, period, TimeUnit.MILLISECONDS);
+ task.setScheduledFuture(scheduledFuture);
+ }
+
+ /**
+ * Removes the specified eviction task from the timer.
+ *
+ * @param evictor Task to be cancelled.
+ * @param timeout If the associated executor is no longer required, how
+ * long should this thread wait for the executor to
+ * terminate?
+ * @param unit The units for the specified timeout.
+ */
+ static synchronized void cancel(
+ final ConnectionPool.Evictor evictor, final long timeout, final TimeUnit unit) {
+ if (evictor != null) {
+ evictor.cancel();
+ }
+ if (executor != null && executor.getQueue().isEmpty()) {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(timeout, unit);
+ } catch (final InterruptedException e) {
+ // Swallow
+ // Significant API changes would be required to propagate this
+ }
+ executor.setCorePoolSize(0);
+ executor = null;
+ }
+ }
+
+ /**
+ * Thread factory that creates a daemon thread, with the context class loader from this class.
+ */
+ private static class EvictorThreadFactory implements ThreadFactory {
+
+ @Override
+ public Thread newThread(final Runnable runnable) {
+ final Thread thread = new Thread(null, runnable, "connection-pool-evictor-thread");
+ thread.setDaemon(true); // POOL-363 - Required for applications using Runtime.addShutdownHook().
+ AccessController.doPrivileged((PrivilegedAction) () -> {
+ thread.setContextClassLoader(EvictorThreadFactory.class.getClassLoader());
+ return null;
+ });
+
+ return thread;
+ }
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java
new file mode 100644
index 000000000..168867169
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java
@@ -0,0 +1,177 @@
+package com.actiontech.dble.backend.pool;
+
+import com.actiontech.dble.DbleServer;
+import com.actiontech.dble.backend.BackendConnection;
+import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
+import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
+import com.actiontech.dble.backend.mysql.nio.MySQLConnectionAuthenticator;
+import com.actiontech.dble.backend.mysql.nio.MySQLConnectionListener;
+import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
+import com.actiontech.dble.config.model.db.DbInstanceConfig;
+import com.actiontech.dble.net.NIOConnector;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.NetworkChannel;
+import java.nio.channels.SocketChannel;
+
+public class PoolBase {
+
+ protected final DbInstanceConfig config;
+ protected final PhysicalDbInstance instance;
+
+ private final long connectionTimeout;
+ private final long connectionHeartbeatTimeout;
+ private final boolean testOnCreate;
+ private final boolean testOnBorrow;
+ private final boolean testOnReturn;
+ private final boolean testWhileIdle;
+ private final long timeBetweenEvictionRunsMillis;
+ private final int numTestsPerEvictionRun;
+ private final long evictorShutdownTimeoutMillis;
+ private final long idleTimeout;
+
+ public PoolBase(DbInstanceConfig dbConfig, PhysicalDbInstance instance) {
+ this.config = dbConfig;
+ this.instance = instance;
+
+ PoolConfig poolConfig = dbConfig.getPoolConfig();
+ this.testOnBorrow = poolConfig.getTestOnBorrow();
+ this.testOnCreate = poolConfig.getTestOnCreate();
+ this.testOnReturn = poolConfig.getTestOnReturn();
+ this.testWhileIdle = poolConfig.getTestWhileIdle();
+ this.connectionHeartbeatTimeout = poolConfig.getConnectionHeartbeatTimeout();
+ this.connectionTimeout = poolConfig.getConnectionTimeout();
+ this.timeBetweenEvictionRunsMillis = poolConfig.getTimeBetweenEvictionRunsMillis();
+ this.numTestsPerEvictionRun = poolConfig.getNumTestsPerEvictionRun();
+ this.evictorShutdownTimeoutMillis = poolConfig.getEvictorShutdownTimeoutMillis();
+ this.idleTimeout = poolConfig.getIdleTimeout();
+ }
+
+ /**
+ * only for heartbeat
+ *
+ * @param handler
+ * @return
+ */
+ public void newConnection(String schema, ResponseHandler handler) {
+ try {
+ NetworkChannel channel = openSocketChannel();
+ MySQLConnection conn = new MySQLConnection(channel, config, instance.isReadInstance(), instance.isAutocommitSynced(), instance.isIsolationSynced());
+ conn.setSocketParams(false);
+ conn.setSchema(schema);
+ conn.setHandler(new MySQLConnectionAuthenticator(conn, new MySQLConnectionListener() {
+ @Override
+ public void onCreateSuccess(BackendConnection conn) {
+ handler.connectionAcquired(conn);
+ }
+
+ @Override
+ public void onCreateFail(BackendConnection conn, Throwable e) {
+ handler.connectionError(e, conn);
+ }
+
+ @Override
+ public void onHeartbeatSuccess(BackendConnection conn) {
+ }
+
+ }));
+
+ if (channel instanceof AsynchronousSocketChannel) {
+ ((AsynchronousSocketChannel) channel).connect(
+ new InetSocketAddress(config.getIp(), config.getPort()), conn,
+ (CompletionHandler) DbleServer.getInstance().getConnector());
+ } else {
+ ((NIOConnector) DbleServer.getInstance().getConnector()).postConnect(conn);
+ }
+ } catch (IOException ioe) {
+ handler.connectionError(ioe, null);
+ }
+ }
+
+ BackendConnection newConnection(String schema, MySQLConnectionListener listener) {
+ try {
+ NetworkChannel channel = openSocketChannel();
+
+ MySQLConnection conn = new MySQLConnection(channel, config, instance.isReadInstance(), instance.isAutocommitSynced(), instance.isIsolationSynced());
+ conn.setSocketParams(false);
+ conn.setSchema(schema);
+ conn.setHandler(new MySQLConnectionAuthenticator(conn, listener));
+ conn.setDbInstance(instance);
+
+ if (channel instanceof AsynchronousSocketChannel) {
+ ((AsynchronousSocketChannel) channel).connect(
+ new InetSocketAddress(config.getIp(), config.getPort()), conn,
+ (CompletionHandler) DbleServer.getInstance().getConnector());
+ } else {
+ ((NIOConnector) DbleServer.getInstance().getConnector()).postConnect(conn);
+ }
+ return conn;
+
+ } catch (IOException ioe) {
+ listener.onCreateFail(null, ioe);
+ return null;
+ }
+ }
+
+ private NetworkChannel openSocketChannel() throws IOException {
+ NetworkChannel channel;
+ if (DbleServer.getInstance().isAIO()) {
+ channel = AsynchronousSocketChannel.open(DbleServer.getInstance().getNextAsyncChannelGroup());
+ } else {
+ channel = SocketChannel.open();
+ ((SocketChannel) channel).configureBlocking(false);
+ }
+ return channel;
+ }
+
+ public final boolean getTestOnCreate() {
+ return testOnCreate;
+ }
+
+ public final boolean getTestOnBorrow() {
+ return testOnBorrow;
+ }
+
+ public final boolean getTestOnReturn() {
+ return testOnReturn;
+ }
+
+ public final boolean getTestWhileIdle() {
+ return testWhileIdle;
+ }
+
+ public final long getTimeBetweenEvictionRunsMillis() {
+ return timeBetweenEvictionRunsMillis;
+ }
+
+ public final int getNumTestsPerEvictionRun() {
+ return numTestsPerEvictionRun;
+ }
+
+ /**
+ * Gets the timeout that will be used when waiting for the Evictor to
+ * shutdown if this pool is closed and it is the only pool still using the
+ * the value for the Evictor.
+ *
+ * @return The timeout in milliseconds that will be used while waiting for
+ * the Evictor to shut down.
+ */
+ public final long getEvictorShutdownTimeoutMillis() {
+ return evictorShutdownTimeoutMillis;
+ }
+
+ public long getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public long getConnectionHeartbeatTimeout() {
+ return connectionHeartbeatTimeout;
+ }
+
+ public long getIdleTimeout() {
+ return idleTimeout;
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/backend/pool/PoolConfig.java b/src/main/java/com/actiontech/dble/backend/pool/PoolConfig.java
new file mode 100644
index 000000000..a92ed5043
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/pool/PoolConfig.java
@@ -0,0 +1,241 @@
+package com.actiontech.dble.backend.pool;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class PoolConfig {
+
+ private static final long CONNECTION_TIMEOUT = SECONDS.toMillis(30);
+ private static final long CON_HEARTBEAT_TIMEOUT = MILLISECONDS.toMillis(20);
+ private static final long DEFAULT_IDLE_TIMEOUT = 30 * 60 * 1000L;
+ private static final long HOUSEKEEPING_PERIOD_MS = SECONDS.toMillis(30);
+
+ private volatile long connectionTimeout = CONNECTION_TIMEOUT;
+ private volatile long connectionHeartbeatTimeout = CON_HEARTBEAT_TIMEOUT;
+ private volatile boolean testOnCreate = false;
+ private volatile boolean testOnBorrow = false;
+ private volatile boolean testOnReturn = false;
+ private volatile boolean testWhileIdle = false;
+ private volatile long timeBetweenEvictionRunsMillis = HOUSEKEEPING_PERIOD_MS;
+ private volatile int numTestsPerEvictionRun = 3;
+ private volatile long evictorShutdownTimeoutMillis = 10000L;
+ private volatile long idleTimeout = DEFAULT_IDLE_TIMEOUT;
+
+ public PoolConfig() {
+ }
+
+ /**
+ * Returns whether objects created for the pool will be validated before
+ * being returned from the borrowObject() method. Validation is
+ * performed by the validateObject() method of the factory
+ * associated with the pool. If the object fails to validate, then
+ * borrowObject() will fail.
+ *
+ * @return true if newly created objects are validated before
+ * being returned from the borrowObject() method
+ * @see #setTestOnCreate
+ * @since 2.2
+ */
+ public final boolean getTestOnCreate() {
+ return testOnCreate;
+ }
+
+ /**
+ * Sets whether objects created for the pool will be validated before
+ * being returned from the borrowObject() method. Validation is
+ * performed by the validateObject() method of the factory
+ * associated with the pool. If the object fails to validate, then
+ * borrowObject() will fail.
+ *
+ * @param testOnCreate true if newly created objects should be
+ * validated before being returned from the
+ * borrowObject() method
+ * @see #getTestOnCreate
+ * @since 2.2
+ */
+ public final void setTestOnCreate(final boolean testOnCreate) {
+ this.testOnCreate = testOnCreate;
+ }
+
+ /**
+ * Returns whether objects borrowed from the pool will be validated before
+ * being returned from the borrowObject() method. Validation is
+ * performed by the validateObject() method of the factory
+ * associated with the pool. If the object fails to validate, it will be
+ * removed from the pool and destroyed, and a new attempt will be made to
+ * borrow an object from the pool.
+ *
+ * @return true if objects are validated before being returned
+ * from the borrowObject() method
+ * @see #setTestOnBorrow
+ */
+ public final boolean getTestOnBorrow() {
+ return testOnBorrow;
+ }
+
+ /**
+ * Sets whether objects borrowed from the pool will be validated before
+ * being returned from the borrowObject() method. Validation is
+ * performed by the validateObject() method of the factory
+ * associated with the pool. If the object fails to validate, it will be
+ * removed from the pool and destroyed, and a new attempt will be made to
+ * borrow an object from the pool.
+ *
+ * @param testOnBorrow true if objects should be validated
+ * before being returned from the
+ * borrowObject() method
+ * @see #getTestOnBorrow
+ */
+ public final void setTestOnBorrow(final boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ }
+
+ /**
+ * Returns whether objects borrowed from the pool will be validated when
+ * they are returned to the pool via the returnObject() method.
+ * Validation is performed by the validateObject() method of
+ * the factory associated with the pool. Returning objects that fail validation
+ * are destroyed rather then being returned the pool.
+ *
+ * @return true if objects are validated on return to
+ * the pool via the returnObject() method
+ * @see #setTestOnReturn
+ */
+ public final boolean getTestOnReturn() {
+ return testOnReturn;
+ }
+
+ /**
+ * Sets whether objects borrowed from the pool will be validated when
+ * they are returned to the pool via the returnObject() method.
+ * Validation is performed by the validateObject() method of
+ * the factory associated with the pool. Returning objects that fail validation
+ * are destroyed rather then being returned the pool.
+ *
+ * @param testOnReturn true if objects are validated on
+ * return to the pool via the
+ * returnObject() method
+ * @see #getTestOnReturn
+ */
+ public final void setTestOnReturn(final boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ }
+
+ /**
+ * Returns whether objects sitting idle in the pool will be validated by the
+ * idle object evictor (if any - see
+ * {@link #setTimeBetweenEvictionRunsMillis(long)}). Validation is performed
+ * by the validateObject() method of the factory associated
+ * with the pool. If the object fails to validate, it will be removed from
+ * the pool and destroyed.
+ *
+ * @return true if objects will be validated by the evictor
+ * @see #setTestWhileIdle
+ * @see #setTimeBetweenEvictionRunsMillis
+ */
+ public final boolean getTestWhileIdle() {
+ return testWhileIdle;
+ }
+
+ /**
+ * Returns whether objects sitting idle in the pool will be validated by the
+ * idle object evictor (if any - see
+ * {@link #setTimeBetweenEvictionRunsMillis(long)}). Validation is performed
+ * by the validateObject() method of the factory associated
+ * with the pool. If the object fails to validate, it will be removed from
+ * the pool and destroyed. Note that setting this property has no effect
+ * unless the idle object evictor is enabled by setting
+ * timeBetweenEvictionRunsMillis to a positive value.
+ *
+ * @param testWhileIdle true so objects will be validated by the evictor
+ * @see #getTestWhileIdle
+ * @see #setTimeBetweenEvictionRunsMillis
+ */
+ public final void setTestWhileIdle(final boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ }
+
+ /**
+ * Returns the number of milliseconds to sleep between runs of the idle
+ * object evictor thread. When non-positive, no idle object evictor thread
+ * will be run.
+ *
+ * @return number of milliseconds to sleep between evictor runs
+ * @see #setTimeBetweenEvictionRunsMillis
+ */
+ public final long getTimeBetweenEvictionRunsMillis() {
+ return timeBetweenEvictionRunsMillis;
+ }
+
+ /**
+ * Sets the number of milliseconds to sleep between runs of the idle object evictor thread.
+ *
+ * - When positive, the idle object evictor thread starts.
+ * - When non-positive, no idle object evictor thread runs.
+ *
+ *
+ * @param timeBetweenEvictionRunsMillis number of milliseconds to sleep between evictor runs
+ * @see #getTimeBetweenEvictionRunsMillis
+ */
+ public final void setTimeBetweenEvictionRunsMillis(
+ final long timeBetweenEvictionRunsMillis) {
+ this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
+ }
+
+ public final int getNumTestsPerEvictionRun() {
+ return numTestsPerEvictionRun;
+ }
+
+ public final void setNumTestsPerEvictionRun(final int numTestsPerEvictionRun) {
+ this.numTestsPerEvictionRun = numTestsPerEvictionRun;
+ }
+
+ /**
+ * Gets the timeout that will be used when waiting for the Evictor to
+ * shutdown if this pool is closed and it is the only pool still using the
+ * the value for the Evictor.
+ *
+ * @return The timeout in milliseconds that will be used while waiting for
+ * the Evictor to shut down.
+ */
+ public final long getEvictorShutdownTimeoutMillis() {
+ return evictorShutdownTimeoutMillis;
+ }
+
+ /**
+ * Sets the timeout that will be used when waiting for the Evictor to
+ * shutdown if this pool is closed and it is the only pool still using the
+ * the value for the Evictor.
+ *
+ * @param evictorShutdownTimeoutMillis the timeout in milliseconds that
+ * will be used while waiting for the
+ * Evictor to shut down.
+ */
+ public final void setEvictorShutdownTimeoutMillis(final long evictorShutdownTimeoutMillis) {
+ this.evictorShutdownTimeoutMillis = evictorShutdownTimeoutMillis;
+ }
+
+ public long getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(long connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ public long getConnectionHeartbeatTimeout() {
+ return connectionHeartbeatTimeout;
+ }
+
+ public void setConnectionHeartbeatTimeout(long connectionHeartbeatTimeout) {
+ this.connectionHeartbeatTimeout = connectionHeartbeatTimeout;
+ }
+
+ public long getIdleTimeout() {
+ return idleTimeout;
+ }
+
+ public void setIdleTimeout(long idleTimeout) {
+ this.idleTimeout = idleTimeout;
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/backend/pool/PooledEntry.java b/src/main/java/com/actiontech/dble/backend/pool/PooledEntry.java
new file mode 100644
index 000000000..fb6912888
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/pool/PooledEntry.java
@@ -0,0 +1,20 @@
+package com.actiontech.dble.backend.pool;
+
+public interface PooledEntry {
+
+ int STATE_REMOVED = -4;
+ int STATE_HEARTBEAT = -3;
+ int STATE_RESERVED = -2;
+ int STATE_IN_USE = -1;
+ int INITIAL = 0;
+ int STATE_NOT_IN_USE = 1;
+
+ boolean compareAndSet(int expect, int update);
+
+ void lazySet(int update);
+
+ int getState();
+
+ void release();
+
+}
diff --git a/src/main/java/com/actiontech/dble/backend/pool/QueuedSequenceSynchronizer.java b/src/main/java/com/actiontech/dble/backend/pool/QueuedSequenceSynchronizer.java
new file mode 100644
index 000000000..29dc80196
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/pool/QueuedSequenceSynchronizer.java
@@ -0,0 +1,92 @@
+package com.actiontech.dble.backend.pool;
+
+import com.actiontech.dble.backend.pool.util.Java8Sequence;
+import com.actiontech.dble.backend.pool.util.Sequence;
+
+import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
+
+public class QueuedSequenceSynchronizer {
+
+ private final Sequence sequence;
+ private final Synchronizer synchronizer;
+
+ /**
+ * Default constructor
+ */
+ public QueuedSequenceSynchronizer() {
+ this.synchronizer = new Synchronizer();
+ this.sequence = new Java8Sequence();
+ }
+
+ /**
+ * Signal any waiting threads.
+ */
+ public void signal() {
+ synchronizer.releaseShared(1);
+ }
+
+ /**
+ * Get the current sequence.
+ *
+ * @return the current sequence
+ */
+ public long currentSequence() {
+ return sequence.get();
+ }
+
+ /**
+ * Block the current thread until the current sequence exceeds the specified threshold, or
+ * until the specified timeout is reached.
+ *
+ * @param seq the threshold the sequence must reach before this thread becomes unblocked
+ * @param nanosTimeout a nanosecond timeout specifying the maximum time to wait
+ * @return true if the threshold was reached, false if the wait timed out
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ public boolean waitUntilSequenceExceeded(long seq, long nanosTimeout) throws InterruptedException {
+ return synchronizer.tryAcquireSharedNanos(seq, nanosTimeout);
+ }
+
+ /**
+ * Queries whether any threads are waiting to for the sequence to reach a particular threshold.
+ *
+ * @return true if there may be other threads waiting for a sequence threshold to be reached
+ */
+ public boolean hasQueuedThreads() {
+ return synchronizer.hasQueuedThreads();
+ }
+
+ /**
+ * Returns an estimate of the number of threads waiting for a sequence threshold to be reached. The
+ * value is only an estimate because the number of threads may change dynamically while this method
+ * traverses internal data structures. This method is designed for use in monitoring system state,
+ * not for synchronization control.
+ *
+ * @return the estimated number of threads waiting for a sequence threshold to be reached
+ */
+ public int getQueueLength() {
+ return synchronizer.getQueueLength();
+ }
+
+ private final class Synchronizer extends AbstractQueuedLongSynchronizer {
+ private static final long serialVersionUID = 104753538004341218L;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected long tryAcquireShared(final long seq) {
+ return sequence.get() - (seq + 1);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected boolean tryReleaseShared(final long unused) {
+ sequence.increment();
+ return true;
+ }
+ }
+
+}
diff --git a/src/main/java/com/actiontech/dble/backend/pool/util/Java8Sequence.java b/src/main/java/com/actiontech/dble/backend/pool/util/Java8Sequence.java
new file mode 100644
index 000000000..869044376
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/pool/util/Java8Sequence.java
@@ -0,0 +1,18 @@
+package com.actiontech.dble.backend.pool.util;
+
+import java.util.concurrent.atomic.LongAdder;
+
+
+/**
+ * A monotonically increasing long sequence.
+ *
+ * @author brettw
+ */
+@SuppressWarnings("serial")
+public class Java8Sequence extends LongAdder implements Sequence {
+ @Override
+ public long get() {
+ return this.sum();
+ }
+}
+
diff --git a/src/main/java/com/actiontech/dble/backend/pool/util/Sequence.java b/src/main/java/com/actiontech/dble/backend/pool/util/Sequence.java
new file mode 100644
index 000000000..be49db5a6
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/pool/util/Sequence.java
@@ -0,0 +1,22 @@
+package com.actiontech.dble.backend.pool.util;
+
+/**
+ * A monotonically increasing long sequence.
+ *
+ * @author brettw
+ */
+@SuppressWarnings("serial")
+public interface Sequence {
+ /**
+ * Increments the current sequence by one.
+ */
+ void increment();
+
+ /**
+ * Get the current sequence.
+ *
+ * @return the current sequence.
+ */
+ long get();
+}
+
diff --git a/src/main/java/com/actiontech/dble/backend/pool/util/TimerHolder.java b/src/main/java/com/actiontech/dble/backend/pool/util/TimerHolder.java
new file mode 100644
index 000000000..66e29f8a5
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/pool/util/TimerHolder.java
@@ -0,0 +1,31 @@
+package com.actiontech.dble.backend.pool.util;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import net.sf.ehcache.util.NamedThreadFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public final class TimerHolder {
+
+ private static final long DEFAULT_TICK_DURATION = 10;
+
+ private TimerHolder() {
+ }
+
+ /**
+ * Get a singleton instance of {@link Timer}.
+ * The tick duration is {@link #DEFAULT_TICK_DURATION}.
+ *
+ * @return Timer
+ */
+ public static Timer getTimer() {
+ return DefaultInstance.INSTANCE;
+ }
+
+ private static class DefaultInstance {
+ static final Timer INSTANCE = new HashedWheelTimer(new NamedThreadFactory("DefaultTimer" + DEFAULT_TICK_DURATION),
+ DEFAULT_TICK_DURATION, TimeUnit.MILLISECONDS);
+ }
+
+}
diff --git a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java
index c86107a74..2b959c659 100644
--- a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java
+++ b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.config;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
@@ -85,22 +85,21 @@ public class ConfigInitializer implements ProblemReporter {
private void checkWriteHost() {
if (this.dbGroups.isEmpty()) {
return;
- } else {
- //Mark all dbInstance whether they are fake or not
- for (PhysicalDbGroup dbGroup : this.dbGroups.values()) {
- for (PhysicalDbInstance source : dbGroup.getAllDbInstances()) {
- if (checkSourceFake(source)) {
- source.setFakeNode(true);
- } else if (!source.isDisabled()) {
- this.fullyConfigured = true;
- }
+ }
+ //Mark all dbInstance whether they are fake or not
+ for (PhysicalDbGroup dbGroup : this.dbGroups.values()) {
+ for (PhysicalDbInstance source : dbGroup.getAllDbInstances()) {
+ if (checkSourceFake(source)) {
+ source.setFakeNode(true);
+ } else if (!source.isDisabled()) {
+ this.fullyConfigured = true;
}
}
- // if there are dbGroups exists. no empty shardingNodes allowed
- for (ShardingNode shardingNode : this.shardingNodes.values()) {
- if (shardingNode.getDbGroup() == null) {
- throw new ConfigException("dbGroup not exists " + shardingNode.getDbGroupName());
- }
+ }
+ // if there are dbGroups exists. no empty shardingNodes allowed
+ for (ShardingNode shardingNode : this.shardingNodes.values()) {
+ if (shardingNode.getDbGroup() == null) {
+ throw new ConfigException("dbGroup not exists " + shardingNode.getDbGroupName());
}
}
}
@@ -238,7 +237,7 @@ public class ConfigInitializer implements ProblemReporter {
private void testDbInstance(Set errNodeKeys, Set errSourceKeys, BoolPtr isConnectivity,
BoolPtr isAllDbInstanceConnected, List> nodeList, PhysicalDbGroup pool, PhysicalDbInstance ds) {
- boolean isMaster = ds == pool.getWriteSource();
+ boolean isMaster = ds == pool.getWriteDbInstance();
String dbInstanceName = "dbInstance[" + ds.getDbGroupConfig().getName() + "." + ds.getName() + "]";
try {
BoolPtr isDSConnectedPtr = new BoolPtr(false);
@@ -321,8 +320,6 @@ public class ConfigInitializer implements ProblemReporter {
return erRelations;
}
-
-
private Map initShardingNodes(Map nodeConf) {
Map nodes = new HashMap<>(nodeConf.size());
for (ShardingNodeConfig conf : nodeConf.values()) {
diff --git a/src/main/java/com/actiontech/dble/config/ServerConfig.java b/src/main/java/com/actiontech/dble/config/ServerConfig.java
index 467cde837..90a40eb44 100644
--- a/src/main/java/com/actiontech/dble/config/ServerConfig.java
+++ b/src/main/java/com/actiontech/dble/config/ServerConfig.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.config;
import com.actiontech.dble.DbleServer;
@@ -20,6 +20,7 @@ import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.config.util.ConfigException;
import com.actiontech.dble.config.util.ConfigUtil;
+import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.route.parser.ManagerParseConfig;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.server.variables.SystemVariables;
@@ -364,7 +365,8 @@ public class ServerConfig {
if (recycleDbGroups != null) {
for (PhysicalDbGroup oldDbGroup : recycleDbGroups.values()) {
if (oldDbGroup != null) {
- oldDbGroup.stopHeartbeat();
+ ReloadLogHelper.info("reload config, recycle old group. old active backend conn will be close", LOGGER);
+ oldDbGroup.stop("reload config, recycle old group", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0));
}
}
}
@@ -378,13 +380,13 @@ public class ServerConfig {
// 1 start heartbeat
// 2 apply the configure
//---------------------------------------------------
- if (changeOrAddDbGroups != null) {
- for (PhysicalDbGroup newDbGroup : changeOrAddDbGroups.values()) {
- if (newDbGroup != null && isFullyConfigured) {
- newDbGroup.startHeartbeat();
- }
- }
- }
+ // if (changeOrAddDbGroups != null) {
+ // for (PhysicalDbGroup newDbGroup : changeOrAddDbGroups.values()) {
+ // if (newDbGroup != null && isFullyConfigured) {
+ // newDbGroup.startHeartbeat();
+ // }
+ // }
+ // }
this.shardingNodes = newShardingNodes;
this.dbGroups = newDbGroups;
this.fullyConfigured = isFullyConfigured;
diff --git a/src/main/java/com/actiontech/dble/config/helper/GetAndSyncDbInstanceKeyVariables.java b/src/main/java/com/actiontech/dble/config/helper/GetAndSyncDbInstanceKeyVariables.java
index bacf71abd..042b56a24 100644
--- a/src/main/java/com/actiontech/dble/config/helper/GetAndSyncDbInstanceKeyVariables.java
+++ b/src/main/java/com/actiontech/dble/config/helper/GetAndSyncDbInstanceKeyVariables.java
@@ -104,12 +104,8 @@ public class GetAndSyncDbInstanceKeyVariables implements Callable
break;
}
keyVariables.setTargetIsolation(SystemConfig.getInstance().getTxIsolation());
-
keyVariables.setMaxPacketSize(Integer.parseInt(result.getResult().get(COLUMN_MAX_PACKET)));
keyVariables.setTargetMaxPacketSize(SystemConfig.getInstance().getMaxPacketSize() + KeyVariables.MARGIN_PACKET_SIZE);
-
-
-
keyVariables.setReadOnly(result.getResult().get(COLUMN_READONLY).equals("1"));
if (needSync) {
diff --git a/src/main/java/com/actiontech/dble/config/loader/xml/XMLDbLoader.java b/src/main/java/com/actiontech/dble/config/loader/xml/XMLDbLoader.java
index fd75bfcab..90158d33e 100644
--- a/src/main/java/com/actiontech/dble/config/loader/xml/XMLDbLoader.java
+++ b/src/main/java/com/actiontech/dble/config/loader/xml/XMLDbLoader.java
@@ -8,25 +8,25 @@ package com.actiontech.dble.config.loader.xml;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.nio.MySQLInstance;
+import com.actiontech.dble.backend.pool.PoolConfig;
import com.actiontech.dble.config.ProblemReporter;
import com.actiontech.dble.config.Versions;
-import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.db.DbGroupConfig;
import com.actiontech.dble.config.model.db.DbInstanceConfig;
import com.actiontech.dble.config.util.ConfigException;
import com.actiontech.dble.config.util.ConfigUtil;
+import com.actiontech.dble.config.util.ParameterMapping;
import com.actiontech.dble.manager.handler.DbGroupHAHandler;
import com.actiontech.dble.util.DecryptUtil;
import com.actiontech.dble.util.ResourceUtil;
+import com.actiontech.dble.util.StringUtil;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import java.io.IOException;
import java.io.InputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -99,7 +99,7 @@ public class XMLDbLoader {
}
}
- private void loadDbGroups(Element root) {
+ private void loadDbGroups(Element root) throws InvocationTargetException, IllegalAccessException {
NodeList list = root.getElementsByTagName("dbGroup");
for (int i = 0, n = list.getLength(); i < n; ++i) {
Set instanceNames = new HashSet<>();
@@ -159,14 +159,14 @@ public class XMLDbLoader {
DbGroupConfig dbGroupConf = new DbGroupConfig(name, writeDbConf, readDbConfList, delayThreshold);
dbGroupConf.setRwSplitMode(rwSplitMode);
- dbGroupConf.setHearbeatSQL(heartbeatSQL);
+ dbGroupConf.setHeartbeatSQL(heartbeatSQL);
dbGroupConf.setHeartbeatTimeout(Integer.parseInt(strHBTimeout) * 1000);
dbGroupConf.setErrorRetryCount(Integer.parseInt(strHBErrorRetryCount));
dbGroupConfigs.put(dbGroupConf.getName(), dbGroupConf);
}
}
- private DbInstanceConfig createDbInstanceConf(String dbGroup, Element node) {
+ private DbInstanceConfig createDbInstanceConf(String dbGroup, Element node) throws InvocationTargetException, IllegalAccessException {
String name = node.getAttribute("name");
String nodeUrl = node.getAttribute("url");
@@ -176,7 +176,7 @@ public class XMLDbLoader {
if (!nameMatcher.matches()) {
throw new ConfigException("dbInstance name " + name + " show be use " + DbGroupHAHandler.DB_NAME_FORMAT + "!");
}
- if (empty(name) || empty(nodeUrl) || empty(user)) {
+ if (StringUtil.isEmpty(name) || StringUtil.isEmpty(nodeUrl) || StringUtil.isEmpty(user)) {
throw new ConfigException(
"dbGroup " + dbGroup +
" define error,some attributes of this element is empty: " +
@@ -190,29 +190,34 @@ public class XMLDbLoader {
password = DecryptUtil.dbHostDecrypt(usingDecrypt, name, user, password);
String disabledStr = ConfigUtil.checkAndGetAttribute(node, "disabled", "false", problemReporter);
boolean disabled = Boolean.parseBoolean(disabledStr);
- String readWeightStr = ConfigUtil.checkAndGetAttribute(node, "readWeight", String.valueOf(PhysicalDbGroup.WEIGHT), problemReporter);
- int readWeight = Integer.parseInt(readWeightStr);
- int maxCon = Integer.parseInt(node.getAttribute("maxCon"));
- int minCon = Integer.parseInt(node.getAttribute("minCon"));
String primaryStr = ConfigUtil.checkAndGetAttribute(node, "primary", "false", problemReporter);
boolean primary = Boolean.parseBoolean(primaryStr);
DbInstanceConfig conf = new DbInstanceConfig(name, ip, port, nodeUrl, user, password, disabled, primary);
+ String readWeightStr = ConfigUtil.checkAndGetAttribute(node, "readWeight", String.valueOf(PhysicalDbGroup.WEIGHT), problemReporter);
+ int readWeight = Integer.parseInt(readWeightStr);
+ int maxCon = Integer.parseInt(node.getAttribute("maxCon"));
+ int minCon = Integer.parseInt(node.getAttribute("minCon"));
conf.setMaxCon(maxCon);
conf.setMinCon(minCon);
conf.setReadWeight(readWeight);
String id = node.getAttribute("id");
- if (!"".equals(id)) {
- conf.setId(id);
- } else {
+ if (StringUtil.isEmpty(id)) {
conf.setId(name);
+ } else {
+ conf.setId(id);
}
- return conf;
- }
+ // init properties of connection pool
+ PoolConfig poolConfig = new PoolConfig();
+ Properties poolProperties = ConfigUtil.loadElements(node);
+ ParameterMapping.mapping(poolConfig, poolProperties, problemReporter);
+ if (poolProperties.size() > 0) {
+ throw new ConfigException("These properties of system are not recognized: " + StringUtil.join(poolProperties.stringPropertyNames(), ","));
+ }
+ conf.setPoolConfig(poolConfig);
- private boolean empty(String dnName) {
- return dnName == null || dnName.length() == 0;
+ return conf;
}
private Map initDbGroups(Map nodeConf) {
@@ -225,9 +230,7 @@ public class XMLDbLoader {
return nodes;
}
- private PhysicalDbInstance createDbInstance(DbGroupConfig conf, DbInstanceConfig node,
- boolean isRead) {
- node.setIdleTimeout(SystemConfig.getInstance().getIdleTimeout());
+ private PhysicalDbInstance createDbInstance(DbGroupConfig conf, DbInstanceConfig node, boolean isRead) {
return new MySQLInstance(node, conf, isRead);
}
diff --git a/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java b/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java
index 584a9673e..dd415e86c 100644
--- a/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java
+++ b/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.config.model.db;
@@ -17,7 +18,7 @@ public class DbGroupConfig {
private int rwSplitMode = PhysicalDbGroup.RW_SPLIT_OFF;
private final DbInstanceConfig writeInstanceConfig;
private final DbInstanceConfig[] readInstanceConfigs;
- private String hearbeatSQL;
+ private String heartbeatSQL;
private boolean isShowSlaveSql = false;
private boolean isSelectReadOnlySql = false;
private int delayThreshold = -1;
@@ -66,13 +67,12 @@ public class DbGroupConfig {
return readInstanceConfigs;
}
-
- public String getHearbeatSQL() {
- return hearbeatSQL;
+ public String getHeartbeatSQL() {
+ return heartbeatSQL;
}
- public void setHearbeatSQL(String heartbeatSQL) {
- this.hearbeatSQL = heartbeatSQL;
+ public void setHeartbeatSQL(String heartbeatSQL) {
+ this.heartbeatSQL = heartbeatSQL;
Matcher matcher = HP_PATTERN_SHOW_SLAVE_STATUS.matcher(heartbeatSQL);
if (matcher.find()) {
isShowSlaveSql = true;
diff --git a/src/main/java/com/actiontech/dble/config/model/db/DbInstanceConfig.java b/src/main/java/com/actiontech/dble/config/model/db/DbInstanceConfig.java
index 578f4efe5..ed226ebfe 100644
--- a/src/main/java/com/actiontech/dble/config/model/db/DbInstanceConfig.java
+++ b/src/main/java/com/actiontech/dble/config/model/db/DbInstanceConfig.java
@@ -1,28 +1,27 @@
/*
* Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.config.model.db;
-import com.actiontech.dble.config.model.SystemConfig;
+import com.actiontech.dble.backend.pool.PoolConfig;
public class DbInstanceConfig {
- private long idleTimeout = SystemConfig.DEFAULT_IDLE_TIMEOUT;
private final String instanceName;
private final String ip;
private final int port;
private final String url;
private final String user;
private final String password;
- private int maxCon;
- private int minCon;
private int readWeight;
private String id;
-
- private boolean disabled = false;
-
- private boolean primary = false;
+ private boolean disabled;
+ private boolean primary;
+ private volatile int maxCon = -1;
+ private volatile int minCon = -1;
+ private volatile PoolConfig poolConfig;
public DbInstanceConfig(String instanceName, String ip, int port, String url,
String user, String password, boolean disabled, boolean primary) {
@@ -36,30 +35,6 @@ public class DbInstanceConfig {
this.primary = primary;
}
- public long getIdleTimeout() {
- return idleTimeout;
- }
-
- public void setIdleTimeout(long idleTimeout) {
- this.idleTimeout = idleTimeout;
- }
-
- public int getMaxCon() {
- return maxCon;
- }
-
- public void setMaxCon(int maxCon) {
- this.maxCon = maxCon;
- }
-
- public int getMinCon() {
- return minCon;
- }
-
- public void setMinCon(int minCon) {
- this.minCon = minCon;
- }
-
public String getInstanceName() {
return instanceName;
}
@@ -104,7 +79,6 @@ public class DbInstanceConfig {
this.id = id;
}
-
public boolean isPrimary() {
return primary;
}
@@ -113,6 +87,30 @@ public class DbInstanceConfig {
this.primary = primary;
}
+ public int getMaxCon() {
+ return maxCon;
+ }
+
+ public void setMaxCon(int maxCon) {
+ this.maxCon = maxCon;
+ }
+
+ public int getMinCon() {
+ return minCon;
+ }
+
+ public void setMinCon(int minCon) {
+ this.minCon = minCon;
+ }
+
+ public PoolConfig getPoolConfig() {
+ return poolConfig;
+ }
+
+ public void setPoolConfig(PoolConfig poolConfig) {
+ this.poolConfig = poolConfig;
+ }
+
@Override
public String toString() {
return "DbInstanceConfig [hostName=" + instanceName + ", url=" + url + "]";
diff --git a/src/main/java/com/actiontech/dble/manager/ManagerConnection.java b/src/main/java/com/actiontech/dble/manager/ManagerConnection.java
index 9e94c396f..181dc5c12 100644
--- a/src/main/java/com/actiontech/dble/manager/ManagerConnection.java
+++ b/src/main/java/com/actiontech/dble/manager/ManagerConnection.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.manager;
import com.actiontech.dble.backend.BackendConnection;
@@ -22,6 +22,7 @@ public class ManagerConnection extends FrontendConnection {
private static final long AUTH_TIMEOUT = 15 * 1000L;
private volatile boolean skipIdleCheck = false;
private ManagerUserConfig userConfig;
+
public ManagerConnection(NetworkChannel channel) throws IOException {
super(channel);
this.handler = new ManagerAuthenticator(this);
@@ -34,6 +35,7 @@ public class ManagerConnection extends FrontendConnection {
public void setUserConfig(ManagerUserConfig userConfig) {
this.userConfig = userConfig;
}
+
@Override
public void handlerQuery(String sql) {
// execute
@@ -44,15 +46,15 @@ public class ManagerConnection extends FrontendConnection {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");
}
}
- @Override
+
+
public boolean isIdleTimeout() {
if (skipIdleCheck) {
return false;
} else if (isAuthenticated) {
- return super.isIdleTimeout();
+ return TimeUtil.currentTimeMillis() > Math.max(lastWriteTime, lastReadTime) + idleTimeout;
} else {
- return TimeUtil.currentTimeMillis() > Math.max(lastWriteTime,
- lastReadTime) + AUTH_TIMEOUT;
+ return TimeUtil.currentTimeMillis() > Math.max(lastWriteTime, lastReadTime) + AUTH_TIMEOUT;
}
}
diff --git a/src/main/java/com/actiontech/dble/manager/handler/DatabaseHandler.java b/src/main/java/com/actiontech/dble/manager/handler/DatabaseHandler.java
index 3eae7a934..f6ef703f5 100644
--- a/src/main/java/com/actiontech/dble/manager/handler/DatabaseHandler.java
+++ b/src/main/java/com/actiontech/dble/manager/handler/DatabaseHandler.java
@@ -72,7 +72,7 @@ public final class DatabaseHandler {
final AtomicInteger numberCount = new AtomicInteger(shardingNodes.size());
for (final String shardingNode : shardingNodes) {
ShardingNode dn = allShardingNodes.get(shardingNode);
- final PhysicalDbInstance ds = dn.getDbGroup().getWriteSource();
+ final PhysicalDbInstance ds = dn.getDbGroup().getWriteDbInstance();
final String schema = dn.getDatabase();
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(new String[0], new SQLQueryResultListener>>() {
@Override
diff --git a/src/main/java/com/actiontech/dble/manager/handler/SelectHandler.java b/src/main/java/com/actiontech/dble/manager/handler/SelectHandler.java
index 12f17ac75..f0e78ab90 100644
--- a/src/main/java/com/actiontech/dble/manager/handler/SelectHandler.java
+++ b/src/main/java/com/actiontech/dble/manager/handler/SelectHandler.java
@@ -60,7 +60,7 @@ public final class SelectHandler {
Iterator iterator = DbleServer.getInstance().getConfig().getDbGroups().values().iterator();
if (iterator.hasNext()) {
PhysicalDbGroup pool = iterator.next();
- final PhysicalDbInstance source = pool.getWriteSource();
+ final PhysicalDbInstance source = pool.getWriteDbInstance();
TransformSQLJob sqlJob = new TransformSQLJob(stmt, null, source, c);
sqlJob.run();
} else {
diff --git a/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java b/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java
index 27e660207..7bae4067d 100644
--- a/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java
+++ b/src/main/java/com/actiontech/dble/manager/handler/ShowHandler.java
@@ -253,7 +253,7 @@ public final class ShowHandler {
Iterator iterator = DbleServer.getInstance().getConfig().getDbGroups().values().iterator();
if (iterator.hasNext()) {
PhysicalDbGroup pool = iterator.next();
- final PhysicalDbInstance source = pool.getWriteSource();
+ final PhysicalDbInstance source = pool.getWriteDbInstance();
TransformSQLJob sqlJob = new TransformSQLJob(stmt, null, source, c);
sqlJob.run();
} else {
diff --git a/src/main/java/com/actiontech/dble/manager/handler/ShowProcesslistHandler.java b/src/main/java/com/actiontech/dble/manager/handler/ShowProcesslistHandler.java
index 86289f749..d7321eae4 100644
--- a/src/main/java/com/actiontech/dble/manager/handler/ShowProcesslistHandler.java
+++ b/src/main/java/com/actiontech/dble/manager/handler/ShowProcesslistHandler.java
@@ -41,7 +41,7 @@ public class ShowProcesslistHandler {
public void execute() {
String sbSql = SQL.replace("{0}", StringUtils.join(threadIds, ','));
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(shardingNode);
- PhysicalDbInstance ds = dn.getDbGroup().getWriteSource();
+ PhysicalDbInstance ds = dn.getDbGroup().getWriteDbInstance();
if (ds.isAlive()) {
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(MYSQL_SHOW_PROCESSLIST_COLS, new MySQLShowProcesslistListener());
SQLJob sqlJob = new SQLJob(sbSql, dn.getDatabase(), resultHandler, ds);
diff --git a/src/main/java/com/actiontech/dble/manager/response/DryRun.java b/src/main/java/com/actiontech/dble/manager/response/DryRun.java
index 11082d8f2..6b80c8f24 100644
--- a/src/main/java/com/actiontech/dble/manager/response/DryRun.java
+++ b/src/main/java/com/actiontech/dble/manager/response/DryRun.java
@@ -124,8 +124,8 @@ public final class DryRun {
}
}
- if (handler.getUsedDataource() != null) {
- handler.getUsedDataource().clearCons("dry run end");
+ if (handler.getUsedDbInstance() != null) {
+ handler.getUsedDbInstance().closeAllConnection("dry run end");
}
userCheck(list, serverConfig);
diff --git a/src/main/java/com/actiontech/dble/manager/response/FlowControlList.java b/src/main/java/com/actiontech/dble/manager/response/FlowControlList.java
index 96e8cc70b..3c407393a 100644
--- a/src/main/java/com/actiontech/dble/manager/response/FlowControlList.java
+++ b/src/main/java/com/actiontech/dble/manager/response/FlowControlList.java
@@ -44,7 +44,7 @@ public final class FlowControlList {
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("WRITE_QUEUE_SIZE", Fields.FIELD_TYPE_LONGLONG);
- FIELDS[i++].setPacketId(++packetId);
+ FIELDS[i].setPacketId(++packetId);
EOF.setPacketId(++packetId);
}
@@ -73,7 +73,7 @@ public final class FlowControlList {
//find all server connection
packetId = findAllServerConnection(buffer, c, packetId);
//find all mysql connection
- packetId = findAllMySQLConeection(buffer, c, packetId);
+ packetId = findAllMySQLConnection(buffer, c, packetId);
}
// write last eof
@@ -94,7 +94,7 @@ public final class FlowControlList {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode("ServerConnection", c.getCharset().getResults()));
row.add(LongUtil.toBytes(fc.getId()));
- row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((ServerConnection) fc).getSchema() + " user = " + fc.getUser(), c.getCharset().getResults()));
+ row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + fc.getSchema() + " user = " + fc.getUser(), c.getCharset().getResults()));
row.add(LongUtil.toBytes(fc.getWriteQueue().size()));
row.setPacketId(++packetId);
buffer = row.write(buffer, c, true);
@@ -104,7 +104,7 @@ public final class FlowControlList {
return packetId;
}
- private static byte findAllMySQLConeection(ByteBuffer buffer, ManagerConnection c, byte packetId) {
+ private static byte findAllMySQLConnection(ByteBuffer buffer, ManagerConnection c, byte packetId) {
NIOProcessor[] processors = DbleServer.getInstance().getBackendProcessors();
for (NIOProcessor p : processors) {
for (BackendConnection bc : p.getBackends().values()) {
@@ -113,7 +113,7 @@ public final class FlowControlList {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode("MySQLConnection", c.getCharset().getResults()));
row.add(LongUtil.toBytes(mc.getThreadId()));
- row.add(StringUtil.encode(mc.getPool().getConfig().getUrl() + "/" + mc.getSchema() + " id = " + mc.getThreadId(), c.getCharset().getResults()));
+ row.add(StringUtil.encode(mc.getDbInstance().getConfig().getUrl() + "/" + mc.getSchema() + " id = " + mc.getThreadId(), c.getCharset().getResults()));
row.add(LongUtil.toBytes(mc.getWriteQueue().size()));
row.setPacketId(++packetId);
buffer = row.write(buffer, c, true);
diff --git a/src/main/java/com/actiontech/dble/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/manager/response/ReloadConfig.java
index 750cd0eb4..90cd69ea1 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ReloadConfig.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ReloadConfig.java
@@ -1,17 +1,14 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.manager.response;
import com.actiontech.dble.DbleServer;
-import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbGroupDiff;
-import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
-import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.btrace.provider.ClusterDelayProvider;
import com.actiontech.dble.cluster.ClusterHelper;
import com.actiontech.dble.cluster.ClusterPathUtil;
@@ -38,7 +35,6 @@ import com.actiontech.dble.meta.ReloadManager;
import com.actiontech.dble.net.FrontendConnection;
import com.actiontech.dble.net.NIOProcessor;
import com.actiontech.dble.net.mysql.OkPacket;
-import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.route.parser.ManagerParseConfig;
import com.actiontech.dble.server.ServerConnection;
import com.actiontech.dble.server.variables.SystemVariables;
@@ -387,7 +383,6 @@ public final class ReloadConfig {
}
FrontendUserManager.getInstance().initForLatest(newUsers, SystemConfig.getInstance().getMaxCon());
ReloadLogHelper.info("reload config: apply new config end", LOGGER);
- recycleOldBackendConnections(recycleHosts, ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0));
if (!loader.isFullyConfigured()) {
recycleServerConnections();
}
@@ -402,12 +397,11 @@ public final class ReloadConfig {
}
}
- private static void initFailed(Map newDbGroups) throws Exception {
+ private static void initFailed(Map newDbGroups) {
// INIT FAILED
ReloadLogHelper.info("reload failed, clear previously created dbInstances ", LOGGER);
for (PhysicalDbGroup dbGroup : newDbGroups.values()) {
- dbGroup.clearDbInstances("reload config");
- dbGroup.stopHeartbeat();
+ dbGroup.stop("reload fail, stop");
}
}
@@ -450,7 +444,6 @@ public final class ReloadConfig {
}
FrontendUserManager.getInstance().initForLatest(newUsers, SystemConfig.getInstance().getMaxCon());
ReloadLogHelper.info("reload config: apply new config end", LOGGER);
- recycleOldBackendConnections(config.getBackupDbGroups(), ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0));
if (!loader.isFullyConfigured()) {
recycleServerConnections();
}
@@ -492,30 +485,6 @@ public final class ReloadConfig {
return newSystemVariables;
}
- private static void findAndcloseFrontCon(BackendConnection con) {
- if (con instanceof MySQLConnection) {
- MySQLConnection mcon1 = (MySQLConnection) con;
- for (NIOProcessor processor : DbleServer.getInstance().getFrontProcessors()) {
- for (FrontendConnection fcon : processor.getFrontends().values()) {
- if (fcon instanceof ServerConnection) {
- ServerConnection scon = (ServerConnection) fcon;
- Map bons = scon.getSession2().getTargetMap();
- for (BackendConnection bcon : bons.values()) {
- if (bcon instanceof MySQLConnection) {
- MySQLConnection mcon2 = (MySQLConnection) bcon;
- if (mcon1 == mcon2) {
- //frontEnd kill change to frontEnd close ,it's not necessary to use kill
- scon.close("reload config all");
- return;
- }
- }
- }
- }
- }
- }
- }
- }
-
private static void recycleServerConnections() {
for (NIOProcessor processor : DbleServer.getInstance().getFrontProcessors()) {
for (FrontendConnection fcon : processor.getFrontends().values()) {
@@ -527,50 +496,6 @@ public final class ReloadConfig {
}
}
- private static void recycleOldBackendConnections(Map recycleMap, boolean closeFrontCon) {
- for (PhysicalDbGroup dbGroup : recycleMap.values()) {
- dbGroup.stopHeartbeat();
- long oldTimestamp = System.currentTimeMillis();
- for (PhysicalDbInstance ds : dbGroup.getAllActiveDbInstances()) {
- for (NIOProcessor processor : DbleServer.getInstance().getBackendProcessors()) {
- for (BackendConnection con : processor.getBackends().values()) {
- if (con instanceof MySQLConnection) {
- MySQLConnection mysqlCon = (MySQLConnection) con;
- if (mysqlCon.getPool() == ds) {
- if (con.isBorrowed()) {
- if (closeFrontCon) {
- ReloadLogHelper.info("old active backend conn will be forced closed by closing front conn, conn info:" + mysqlCon, LOGGER);
- findAndcloseFrontCon(con);
- } else {
- ReloadLogHelper.info("old active backend conn will be added to old pool, conn info:" + mysqlCon, LOGGER);
- con.setOldTimestamp(oldTimestamp);
- NIOProcessor.BACKENDS_OLD.add(con);
- }
- } else {
- ReloadLogHelper.info("old idle backend conn will be closed, conn info:" + mysqlCon, LOGGER);
- con.close("old idle conn for reload merge");
- }
- }
- }
- }
- }
- }
- }
- if (closeFrontCon) {
- for (NIOProcessor processor : DbleServer.getInstance().getBackendProcessors()) {
- for (BackendConnection con : processor.getBackends().values()) {
- if (con instanceof MySQLConnection) {
- MySQLConnection mysqlCon = (MySQLConnection) con;
- if (mysqlCon.getOldTimestamp() != 0) {
- findAndcloseFrontCon(con);
- }
- }
- }
- }
- }
-
- }
-
private static void distinguishDbGroup(Map newDbGroups, Map oldDbGroups,
Map addOrChangeDbGroups, Map noChangeDbGroups,
Map recycleHosts) {
@@ -634,14 +559,10 @@ public final class ReloadConfig {
}
}
dbGroup.setSchemas(dnSchemas.toArray(new String[dnSchemas.size()]));
- if (!dbGroup.isInitSuccess() && fullyConfigured) {
+ if (fullyConfigured) {
dbGroup.init();
- if (!dbGroup.isInitSuccess()) {
- reasonMsg = "Init dbGroup [" + dbGroup.getGroupName() + "] failed";
- break;
- }
} else {
- LOGGER.info("dbGroup[" + hostName + "] already initiated, so doing nothing");
+ LOGGER.info("dbGroup[" + hostName + "] is not fullyConfigured, so doing nothing");
}
}
return reasonMsg;
diff --git a/src/main/java/com/actiontech/dble/manager/response/RollbackConfig.java b/src/main/java/com/actiontech/dble/manager/response/RollbackConfig.java
index 68fca2b5d..5dfe5947b 100644
--- a/src/main/java/com/actiontech/dble/manager/response/RollbackConfig.java
+++ b/src/main/java/com/actiontech/dble/manager/response/RollbackConfig.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.manager.response;
import com.actiontech.dble.DbleServer;
@@ -256,24 +256,9 @@ public final class RollbackConfig {
Map> erRelations = conf.getBackupErRelations();
boolean backIsFullyConfiged = conf.backIsFullyConfiged();
if (conf.canRollbackAll()) {
- boolean rollbackStatus = true;
- String errorMsg = null;
if (conf.isFullyConfigured()) {
for (PhysicalDbGroup dn : dbGroups.values()) {
dn.init();
- if (!dn.isInitSuccess()) {
- rollbackStatus = false;
- errorMsg = "dbGroup[" + dn.getGroupName() + "] inited failure";
- break;
- }
- }
- // INIT FAILED
- if (!rollbackStatus) {
- for (PhysicalDbGroup dn : dbGroups.values()) {
- dn.clearDbInstances("rollbackup config");
- dn.stopHeartbeat();
- }
- throw new Exception(errorMsg);
}
}
final Map cNodes = conf.getDbGroups();
@@ -281,8 +266,7 @@ public final class RollbackConfig {
boolean result = conf.rollback(users, schemas, shardingNodes, dbGroups, erRelations, backIsFullyConfiged);
// stop old resource heartbeat
for (PhysicalDbGroup dn : cNodes.values()) {
- dn.clearDbInstances("clear old config ");
- dn.stopHeartbeat();
+ dn.stop("initial failed, rollback up config");
}
if (!backIsFullyConfiged) {
for (NIOProcessor processor : DbleServer.getInstance().getFrontProcessors()) {
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowBackend.java b/src/main/java/com/actiontech/dble/manager/response/ShowBackend.java
index 431e95f6a..b8e2781e9 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowBackend.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowBackend.java
@@ -7,9 +7,11 @@ package com.actiontech.dble.manager.response;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.BackendConnection;
+import com.actiontech.dble.backend.heartbeat.HeartbeatSQLJob;
import com.actiontech.dble.backend.mysql.PacketUtil;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
+import com.actiontech.dble.backend.pool.PooledEntry;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.Fields;
import com.actiontech.dble.manager.ManagerConnection;
@@ -19,7 +21,6 @@ import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.ResultSetHeaderPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.route.factory.RouteStrategyFactory;
-import com.actiontech.dble.sqlengine.HeartbeatSQLJob;
import com.actiontech.dble.util.*;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
@@ -67,7 +68,7 @@ public final class ShowBackend {
FIELDS[i++].setPacketId(++packetId);
// fields[i] = PacketUtil.getField("run", Fields.FIELD_TYPE_VAR_STRING);
// fields[i++].packetId = ++packetId;
- FIELDS[i] = PacketUtil.getField("BORROWED", Fields.FIELD_TYPE_VAR_STRING);
+ FIELDS[i] = PacketUtil.getField("STATE", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("SEND_QUEUE", Fields.FIELD_TYPE_LONG);
FIELDS[i++].setPacketId(++packetId);
@@ -198,6 +199,7 @@ public final class ShowBackend {
if (!(c instanceof MySQLConnection)) {
return null;
}
+ int state = c.getState();
MySQLConnection conn = (MySQLConnection) c;
row.add(conn.getProcessor().getName().getBytes());
row.add(LongUtil.toBytes(c.getId()));
@@ -209,7 +211,7 @@ public final class ShowBackend {
row.add(LongUtil.toBytes(c.getNetOutBytes()));
row.add(LongUtil.toBytes((TimeUtil.currentTimeMillis() - c.getStartupTime()) / 1000L));
row.add(c.isClosed() ? "true".getBytes() : "false".getBytes());
- row.add(c.isBorrowed() ? "true".getBytes() : "false".getBytes());
+ row.add(stateStr(state).getBytes());
row.add(IntegerUtil.toBytes(conn.getWriteQueue().size()));
row.add((conn.getSchema() == null ? "NULL" : conn.getSchema()).getBytes());
row.add(conn.getCharset().getClient().getBytes());
@@ -221,12 +223,31 @@ public final class ShowBackend {
row.add(StringUtil.encode(conn.getStringOfUsrVariables(), charset));
row.add(StringUtil.encode(conn.getXaStatus().toString(), charset));
row.add(StringUtil.encode(FormatUtil.formatDate(conn.getOldTimestamp()), charset));
- if (c.isBorrowed()) {
+ if (state == PooledEntry.INITIAL) {
ResponseHandler handler = ((MySQLConnection) c).getRespHandler();
- row.add(handler != null && handler instanceof HeartbeatSQLJob ? "true".getBytes() : "false".getBytes());
+ row.add(handler instanceof HeartbeatSQLJob ? "true".getBytes() : "false".getBytes());
} else {
row.add("false".getBytes());
}
return row;
}
+
+ public static String stateStr(int state) {
+ switch (state) {
+ case PooledEntry.STATE_IN_USE:
+ return "IN USE";
+ case PooledEntry.STATE_NOT_IN_USE:
+ return "IDLE";
+ case PooledEntry.STATE_REMOVED:
+ return "REMOVED";
+ case PooledEntry.STATE_HEARTBEAT:
+ return "HEARTBEAT CHECK";
+ case PooledEntry.STATE_RESERVED:
+ return "EVICT";
+ case PooledEntry.INITIAL:
+ return "IN CREATION OR OUT OF POOL";
+ default:
+ return "UNKNOWN STATE";
+ }
+ }
}
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowBackendOld.java b/src/main/java/com/actiontech/dble/manager/response/ShowBackendOld.java
index 2b9d8f383..5978f7353 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowBackendOld.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowBackendOld.java
@@ -8,6 +8,7 @@ package com.actiontech.dble.manager.response;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.PacketUtil;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
+import com.actiontech.dble.backend.pool.PooledEntry;
import com.actiontech.dble.config.Fields;
import com.actiontech.dble.manager.ManagerConnection;
import com.actiontech.dble.net.NIOProcessor;
@@ -101,7 +102,7 @@ public final class ShowBackendOld {
row.add(LongUtil.toBytes(c.getNetOutBytes()));
row.add(LongUtil.toBytes((TimeUtil.currentTimeMillis() - c.getStartupTime()) / 1000L));
row.add(LongUtil.toBytes(c.getLastTime()));
- boolean isBorrowed = c.isBorrowed();
+ boolean isBorrowed = c.getState() == PooledEntry.STATE_IN_USE;
row.add(isBorrowed ? "true".getBytes() : "false".getBytes());
return row;
}
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowBackendStat.java b/src/main/java/com/actiontech/dble/manager/response/ShowBackendStat.java
index 162f4d774..bc22e4a4f 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowBackendStat.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowBackendStat.java
@@ -1,14 +1,15 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.manager.response;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.PacketUtil;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
+import com.actiontech.dble.backend.pool.PooledEntry;
import com.actiontech.dble.config.Fields;
import com.actiontech.dble.manager.ManagerConnection;
import com.actiontech.dble.net.NIOProcessor;
@@ -52,7 +53,7 @@ public final class ShowBackendStat {
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("TOTAL", Fields.FIELD_TYPE_LONGLONG);
- FIELDS[i++].setPacketId(++packetId);
+ FIELDS[i].setPacketId(++packetId);
EOF.setPacketId(++packetId);
}
@@ -71,7 +72,7 @@ public final class ShowBackendStat {
HashMap infos = stat();
byte packetId = EOF.getPacketId();
- for (Map.Entry entry: infos.entrySet()) {
+ for (Map.Entry entry : infos.entrySet()) {
RowDataPacket row = getRow(entry.getValue(), c.getCharset().getResults());
row.setPacketId(++packetId);
buffer = row.write(buffer, c, true);
@@ -93,24 +94,24 @@ public final class ShowBackendStat {
}
private static HashMap stat() {
- HashMap all = new HashMap();
+ HashMap all = new HashMap<>();
for (NIOProcessor p : DbleServer.getInstance().getBackendProcessors()) {
for (BackendConnection bc : p.getBackends().values()) {
- if ((bc == null) || !(bc instanceof MySQLConnection)) {
+ if (!(bc instanceof MySQLConnection)) {
break;
}
MySQLConnection con = (MySQLConnection) bc;
String host = con.getHost();
long port = con.getPort();
- BackendStat info = all.get(host + Long.toString(port));
+ BackendStat info = all.get(host + port);
if (info == null) {
info = new BackendStat(host, port);
- all.put(host + Long.toString(port), info);
+ all.put(host + port, info);
}
- if (con.isBorrowed()) {
+ if (con.getState() == PooledEntry.STATE_IN_USE) {
info.addActive();
}
info.addTotal();
@@ -132,21 +133,27 @@ public final class ShowBackendStat {
this.active = 0;
this.total = 0;
}
+
public String getHost() {
return this.host;
}
+
public long getPort() {
return this.port;
}
+
public void addActive() {
this.active++;
}
+
public void addTotal() {
this.total++;
}
+
public long getActive() {
return this.active;
}
+
public long getTotal() {
return this.total;
}
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowBinlogStatus.java b/src/main/java/com/actiontech/dble/manager/response/ShowBinlogStatus.java
index 908aa8a3b..2b6691391 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowBinlogStatus.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowBinlogStatus.java
@@ -347,7 +347,7 @@ public final class ShowBinlogStatus {
rows = new CopyOnWriteArrayList<>();
for (PhysicalDbGroup pool : allPools) {
//if WRITE_RANDOM_NODE ,may the binlog is not ready.
- final PhysicalDbInstance source = pool.getWriteSource();
+ final PhysicalDbInstance source = pool.getWriteDbInstance();
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(FIELDS,
new SQLQueryResultListener>>() {
@Override
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowDbInstance.java b/src/main/java/com/actiontech/dble/manager/response/ShowDbInstance.java
index ed4e441a5..c218e0bb0 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowDbInstance.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowDbInstance.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.manager.response;
import com.actiontech.dble.DbleServer;
@@ -28,7 +28,7 @@ public final class ShowDbInstance {
private ShowDbInstance() {
}
- private static final int FIELD_COUNT = 12;
+ private static final int FIELD_COUNT = 11;
private static final ResultSetHeaderPacket HEADER = PacketUtil.getHeader(FIELD_COUNT);
private static final FieldPacket[] FIELDS = new FieldPacket[FIELD_COUNT];
private static final EOFPacket EOF = new EOFPacket();
@@ -62,9 +62,6 @@ public final class ShowDbInstance {
FIELDS[i] = PacketUtil.getField("SIZE", Fields.FIELD_TYPE_LONG);
FIELDS[i++].setPacketId(++packetId);
- FIELDS[i] = PacketUtil.getField("EXECUTE", Fields.FIELD_TYPE_LONGLONG);
- FIELDS[i++].setPacketId(++packetId);
-
FIELDS[i] = PacketUtil.getField("READ_LOAD", Fields.FIELD_TYPE_LONG);
FIELDS[i++].setPacketId(++packetId);
@@ -127,18 +124,16 @@ public final class ShowDbInstance {
private static RowDataPacket getRow(String dbGroup, PhysicalDbInstance ds,
String charset) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
- int idleCount = ds.getIdleCount();
row.add(StringUtil.encode(dbGroup, charset));
row.add(StringUtil.encode(ds.getName(), charset));
row.add(StringUtil.encode(ds.getConfig().getIp(), charset));
row.add(IntegerUtil.toBytes(ds.getConfig().getPort()));
row.add(StringUtil.encode(ds.isReadInstance() ? "R" : "W", charset));
- row.add(IntegerUtil.toBytes(ds.getTotalConCount() - idleCount));
- row.add(IntegerUtil.toBytes(idleCount));
- row.add(IntegerUtil.toBytes(ds.getSize()));
- row.add(LongUtil.toBytes(ds.getExecuteCount()));
- row.add(LongUtil.toBytes(ds.getReadCount()));
- row.add(LongUtil.toBytes(ds.getWriteCount()));
+ row.add(IntegerUtil.toBytes(ds.getActiveConnections()));
+ row.add(IntegerUtil.toBytes(ds.getIdleConnections()));
+ row.add(LongUtil.toBytes(ds.getTotalConnections()));
+ row.add(LongUtil.toBytes(ds.getCount(true)));
+ row.add(LongUtil.toBytes(ds.getCount(false)));
row.add(StringUtil.encode(ds.isDisabled() ? "true" : "false", charset));
return row;
}
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowHeartbeat.java b/src/main/java/com/actiontech/dble/manager/response/ShowHeartbeat.java
index 0455b3824..685ff4ae8 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowHeartbeat.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowHeartbeat.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.manager.response;
import com.actiontech.dble.DbleServer;
@@ -73,7 +73,7 @@ public final class ShowHeartbeat {
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("RS_MESSAGE ", Fields.FIELD_TYPE_VAR_STRING);
- FIELDS[i++].setPacketId(++packetId);
+ FIELDS[i].setPacketId(++packetId);
EOF.setPacketId(++packetId);
}
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowShardingNode.java b/src/main/java/com/actiontech/dble/manager/response/ShowShardingNode.java
index f2ce179e5..96e930fe2 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowShardingNode.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowShardingNode.java
@@ -28,7 +28,10 @@ import com.actiontech.dble.util.TimeUtil;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.text.NumberFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
/**
* ShowShardingNode
@@ -107,7 +110,7 @@ public final class ShowShardingNode {
keys.addAll(sc.getAllShardingNodes());
}
}
- Collections.sort(keys, new Comparator() {
+ keys.sort(new Comparator() {
@Override
public int compare(String o1, String o2) {
Pair p1 = PairUtil.splitIndex(o1, '[', ']');
@@ -138,7 +141,7 @@ public final class ShowShardingNode {
private static RowDataPacket getRow(ShardingNode node, String charset) {
PhysicalDbGroup pool = node.getDbGroup();
- PhysicalDbInstance ds = pool.getWriteSource();
+ PhysicalDbInstance ds = pool.getWriteDbInstance();
if (ds != null) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode(node.getName(), charset));
@@ -146,13 +149,13 @@ public final class ShowShardingNode {
node.getDbGroup().getGroupName() + '/' + node.getDatabase(),
charset));
row.add(StringUtil.encode(node.isSchemaExists() ? "true" : "false", charset));
- int active = ds.getActiveCountForSchema(node.getDatabase());
- int idle = ds.getIdleCountForSchema(node.getDatabase());
+ int active = ds.getActiveConnections(node.getDatabase());
+ int idle = ds.getIdleConnections(node.getDatabase());
row.add(IntegerUtil.toBytes(active));
row.add(IntegerUtil.toBytes(idle));
- row.add(IntegerUtil.toBytes(ds.getSize()));
- row.add(LongUtil.toBytes(ds.getExecuteCountForSchema(node.getDatabase())));
- long recoveryTime = pool.getWriteSource().getHeartbeatRecoveryTime() - TimeUtil.currentTimeMillis();
+ row.add(IntegerUtil.toBytes(ds.getConfig().getMaxCon()));
+ row.add(LongUtil.toBytes(0));
+ long recoveryTime = ds.getHeartbeatRecoveryTime() - TimeUtil.currentTimeMillis();
row.add(LongUtil.toBytes(recoveryTime > 0 ? recoveryTime / 1000L : -1L));
return row;
} else {
diff --git a/src/main/java/com/actiontech/dble/manager/response/ShowTableShardingNode.java b/src/main/java/com/actiontech/dble/manager/response/ShowTableShardingNode.java
index 5dd57f89b..99f2400bc 100644
--- a/src/main/java/com/actiontech/dble/manager/response/ShowTableShardingNode.java
+++ b/src/main/java/com/actiontech/dble/manager/response/ShowTableShardingNode.java
@@ -140,7 +140,7 @@ public final class ShowTableShardingNode {
int sequence = 0;
for (String shardingNode : shardingNodes) {
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(shardingNode);
- DbInstanceConfig dbConfig = dn.getDbGroup().getWriteSource().getConfig();
+ DbInstanceConfig dbConfig = dn.getDbGroup().getWriteDbInstance().getConfig();
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode(dn.getName(), charset));
row.add(LongUtil.toBytes(sequence));
diff --git a/src/main/java/com/actiontech/dble/manager/response/StopHeartbeat.java b/src/main/java/com/actiontech/dble/manager/response/StopHeartbeat.java
index a7175a95b..0bd5c09c0 100644
--- a/src/main/java/com/actiontech/dble/manager/response/StopHeartbeat.java
+++ b/src/main/java/com/actiontech/dble/manager/response/StopHeartbeat.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.manager.response;
import com.actiontech.dble.DbleServer;
@@ -38,7 +38,7 @@ public final class StopHeartbeat {
for (String key : keys.getKey()) {
PhysicalDbGroup dn = dns.get(key);
if (dn != null) {
- dn.getWriteSource().setHeartbeatRecoveryTime(TimeUtil.currentTimeMillis() + time);
+ dn.getWriteDbInstance().setHeartbeatRecoveryTime(TimeUtil.currentTimeMillis() + time);
++count;
StringBuilder s = new StringBuilder();
s.append(dn.getGroupName()).append(" stop heartbeat '");
diff --git a/src/main/java/com/actiontech/dble/meta/table/AbstractTableMetaHandler.java b/src/main/java/com/actiontech/dble/meta/table/AbstractTableMetaHandler.java
index 4ff282a37..b257284b5 100644
--- a/src/main/java/com/actiontech/dble/meta/table/AbstractTableMetaHandler.java
+++ b/src/main/java/com/actiontech/dble/meta/table/AbstractTableMetaHandler.java
@@ -57,7 +57,7 @@ public abstract class AbstractTableMetaHandler {
return;
}
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(shardingNode);
- PhysicalDbInstance ds = dn.getDbGroup().getWriteSource();
+ PhysicalDbInstance ds = dn.getDbGroup().getWriteDbInstance();
String sql = SQL_PREFIX + "`" + tableName + "`";
if (ds.isAlive()) {
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(MYSQL_SHOW_CREATE_TABLE_COLS, new MySQLTableStructureListener(shardingNode, System.currentTimeMillis(), ds));
diff --git a/src/main/java/com/actiontech/dble/meta/table/DryRunGetNodeTablesHandler.java b/src/main/java/com/actiontech/dble/meta/table/DryRunGetNodeTablesHandler.java
index e6e8a65c3..db557dc2a 100644
--- a/src/main/java/com/actiontech/dble/meta/table/DryRunGetNodeTablesHandler.java
+++ b/src/main/java/com/actiontech/dble/meta/table/DryRunGetNodeTablesHandler.java
@@ -43,7 +43,7 @@ public class DryRunGetNodeTablesHandler extends GetNodeTablesHandler {
public void execute() {
String mysqlShowTableCol = "Tables_in_" + phyShardingNode.getDatabase();
String[] mysqlShowTableCols = new String[]{mysqlShowTableCol};
- PhysicalDbInstance tds = phyShardingNode.getDbGroup().getWriteSource();
+ PhysicalDbInstance tds = phyShardingNode.getDbGroup().getWriteDbInstance();
PhysicalDbInstance ds = null;
if (tds != null) {
if (tds.isTestConnSuccess()) {
diff --git a/src/main/java/com/actiontech/dble/meta/table/GetNodeTablesHandler.java b/src/main/java/com/actiontech/dble/meta/table/GetNodeTablesHandler.java
index 43c62c74d..4bd57bb44 100644
--- a/src/main/java/com/actiontech/dble/meta/table/GetNodeTablesHandler.java
+++ b/src/main/java/com/actiontech/dble/meta/table/GetNodeTablesHandler.java
@@ -50,7 +50,7 @@ public abstract class GetNodeTablesHandler {
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(shardingNode);
String mysqlShowTableCol = "Tables_in_" + dn.getDatabase();
String[] mysqlShowTableCols = new String[]{mysqlShowTableCol, "Table_type"};
- PhysicalDbInstance ds = dn.getDbGroup().getWriteSource();
+ PhysicalDbInstance ds = dn.getDbGroup().getWriteDbInstance();
if (ds.isAlive()) {
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(mysqlShowTableCols, new MySQLShowTablesListener(mysqlShowTableCol, dn.getDatabase(), ds));
SQLJob sqlJob = new SQLJob(sql, dn.getDatabase(), resultHandler, ds);
diff --git a/src/main/java/com/actiontech/dble/meta/table/GetTableMetaHandler.java b/src/main/java/com/actiontech/dble/meta/table/GetTableMetaHandler.java
index 659ea3332..1df647e77 100644
--- a/src/main/java/com/actiontech/dble/meta/table/GetTableMetaHandler.java
+++ b/src/main/java/com/actiontech/dble/meta/table/GetTableMetaHandler.java
@@ -49,7 +49,7 @@ public abstract class GetTableMetaHandler {
sbSql.append(SQL_SHOW_CREATE_TABLE.replace("{0}", table));
}
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(shardingNode);
- PhysicalDbInstance ds = dn.getDbGroup().getWriteSource();
+ PhysicalDbInstance ds = dn.getDbGroup().getWriteDbInstance();
if (ds.isAlive()) {
logger.info("dbInstance is alive start sqljob for shardingNode:" + shardingNode);
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(MYSQL_SHOW_CREATE_TABLE_COLS, new TableStructureListener(shardingNode, tables, ds));
diff --git a/src/main/java/com/actiontech/dble/net/AbstractConnection.java b/src/main/java/com/actiontech/dble/net/AbstractConnection.java
index b871b9f32..410d7bf13 100644
--- a/src/main/java/com/actiontech/dble/net/AbstractConnection.java
+++ b/src/main/java/com/actiontech/dble/net/AbstractConnection.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.net;
import com.actiontech.dble.DbleServer;
@@ -68,8 +68,6 @@ public abstract class AbstractConnection implements NIOConnection {
protected volatile Map usrVariables;
protected volatile Map sysVariables;
- private long idleTimeout;
-
private final SocketWR socketWR;
private byte[] rowData;
@@ -149,10 +147,6 @@ public abstract class AbstractConnection implements NIOConnection {
return socketWR;
}
- public void setIdleTimeout(long idleTimeout) {
- this.idleTimeout = idleTimeout;
- }
-
public int getLocalPort() {
return localPort;
}
@@ -161,7 +155,6 @@ public abstract class AbstractConnection implements NIOConnection {
return host;
}
-
public int getPort() {
return port;
}
@@ -178,10 +171,6 @@ public abstract class AbstractConnection implements NIOConnection {
this.id = id;
}
- public boolean isIdleTimeout() {
- return TimeUtil.currentTimeMillis() > Math.max(lastWriteTime, lastReadTime) + idleTimeout;
- }
-
public Map getUsrVariables() {
return usrVariables;
}
@@ -194,7 +183,6 @@ public abstract class AbstractConnection implements NIOConnection {
return channel;
}
-
public void setReadBufferChunk(int readBufferChunk) {
this.readBufferChunk = readBufferChunk;
}
@@ -551,6 +539,7 @@ public abstract class AbstractConnection implements NIOConnection {
if (processor != null) {
processor.removeConnection(this);
}
+
this.cleanup();
isSupportCompress = false;
@@ -574,13 +563,6 @@ public abstract class AbstractConnection implements NIOConnection {
return isClosed;
}
- public void idleCheck() {
- if (isIdleTimeout()) {
- LOGGER.info(toString() + " idle timeout");
- close(" idle ");
- }
- }
-
protected synchronized void cleanup() {
if (readBuffer != null) {
@@ -677,7 +659,7 @@ public abstract class AbstractConnection implements NIOConnection {
}
public void onConnectFinish() {
- LOGGER.debug("The backend conntinon has finished connecting");
+ LOGGER.debug("The backend connection has finished connecting");
}
public void setSocketParams(boolean isFrontChannel) throws IOException {
@@ -702,7 +684,6 @@ public abstract class AbstractConnection implements NIOConnection {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
this.setMaxPacketSize(system.getMaxPacketSize());
- this.setIdleTimeout(system.getIdleTimeout());
this.initCharacterSet(system.getCharset());
this.setReadBufferChunk(soRcvBuf);
}
@@ -756,8 +737,7 @@ public abstract class AbstractConnection implements NIOConnection {
}
/*
- start flow control because of the write queue in this connection to long
-
+ * start flow control because of the write queue in this connection to long
*/
public abstract void startFlowControl(BackendConnection bcon);
diff --git a/src/main/java/com/actiontech/dble/net/ClosableConnection.java b/src/main/java/com/actiontech/dble/net/ClosableConnection.java
index d0916c894..2d34ba654 100644
--- a/src/main/java/com/actiontech/dble/net/ClosableConnection.java
+++ b/src/main/java/com/actiontech/dble/net/ClosableConnection.java
@@ -1,8 +1,8 @@
/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
package com.actiontech.dble.net;
import com.actiontech.dble.net.mysql.CharsetNames;
@@ -17,8 +17,6 @@ public interface ClosableConnection {
boolean isClosed();
- void idleCheck();
-
long getStartupTime();
String getHost();
diff --git a/src/main/java/com/actiontech/dble/net/FrontendConnection.java b/src/main/java/com/actiontech/dble/net/FrontendConnection.java
index cdd0136d7..b0aa7ec02 100755
--- a/src/main/java/com/actiontech/dble/net/FrontendConnection.java
+++ b/src/main/java/com/actiontech/dble/net/FrontendConnection.java
@@ -47,6 +47,7 @@ public abstract class FrontendConnection extends AbstractConnection {
protected FrontendQueryHandler queryHandler;
protected String executeSql;
+ protected final long idleTimeout = SystemConfig.getInstance().getIdleTimeout();
public FrontendConnection(NetworkChannel channel) throws IOException {
super(channel);
diff --git a/src/main/java/com/actiontech/dble/net/NIOProcessor.java b/src/main/java/com/actiontech/dble/net/NIOProcessor.java
index 1c9c7d718..0d6907bdc 100644
--- a/src/main/java/com/actiontech/dble/net/NIOProcessor.java
+++ b/src/main/java/com/actiontech/dble/net/NIOProcessor.java
@@ -9,6 +9,7 @@ import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.stage.XAStage;
import com.actiontech.dble.backend.mysql.xa.TxState;
+import com.actiontech.dble.backend.pool.PooledEntry;
import com.actiontech.dble.buffer.BufferPool;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.server.ServerConnection;
@@ -18,7 +19,6 @@ import com.actiontech.dble.util.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,7 +46,7 @@ public final class NIOProcessor {
private AtomicInteger frontEndsLength = new AtomicInteger(0);
- public NIOProcessor(String name, BufferPool bufferPool) throws IOException {
+ public NIOProcessor(String name, BufferPool bufferPool) {
this.name = name;
this.bufferPool = bufferPool;
this.frontends = new ConcurrentHashMap<>();
@@ -145,19 +145,21 @@ public final class NIOProcessor {
} else {
// very important ,for some data maybe not sent
checkConSendQueue(c);
- if (c instanceof ServerConnection && c.isIdleTimeout()) {
+ if (c instanceof ServerConnection) {
ServerConnection s = (ServerConnection) c;
- String xaStage = s.getSession2().getTransactionManager().getXAStage();
- if (xaStage != null) {
- if (!xaStage.equals(XAStage.COMMIT_FAIL_STAGE) && !xaStage.equals(XAStage.ROLLBACK_FAIL_STAGE)) {
- // Active/IDLE/PREPARED XA FrontendS will be rollbacked
- s.close("Idle Timeout");
- XASessionCheck.getInstance().addRollbackSession(s.getSession2());
+ if (s.isIdleTimeout()) {
+ String xaStage = s.getSession2().getTransactionManager().getXAStage();
+ if (xaStage != null) {
+ if (!xaStage.equals(XAStage.COMMIT_FAIL_STAGE) && !xaStage.equals(XAStage.ROLLBACK_FAIL_STAGE)) {
+ // Active/IDLE/PREPARED XA FrontendS will be rollbacked
+ s.close("Idle Timeout");
+ XASessionCheck.getInstance().addRollbackSession(s.getSession2());
+ }
+ } else {
+ s.close("idle timeout");
}
- continue;
}
}
- c.idleCheck();
}
}
}
@@ -193,7 +195,7 @@ public final class NIOProcessor {
}
}
// close the conn which executeTimeOut
- if (!c.isDDL() && c.isBorrowed() && c.isExecuting() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) {
+ if (!c.isDDL() && c.getState() == PooledEntry.STATE_IN_USE && c.isExecuting() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) {
LOGGER.info("found backend connection SQL timeout ,close it " + c);
c.close("sql timeout");
}
@@ -206,7 +208,6 @@ public final class NIOProcessor {
if (c instanceof AbstractConnection) {
checkConSendQueue((AbstractConnection) c);
}
- c.idleCheck();
}
}
}
diff --git a/src/main/java/com/actiontech/dble/net/NIOSocketWR.java b/src/main/java/com/actiontech/dble/net/NIOSocketWR.java
index 95d642166..23fd4c3b9 100644
--- a/src/main/java/com/actiontech/dble/net/NIOSocketWR.java
+++ b/src/main/java/com/actiontech/dble/net/NIOSocketWR.java
@@ -9,8 +9,6 @@ import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.config.FlowControllerConfig;
import com.actiontech.dble.singleton.WriteQueueFlowController;
import com.actiontech.dble.util.TimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -21,7 +19,7 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
public class NIOSocketWR extends SocketWR {
- private static final Logger LOGGER = LoggerFactory.getLogger(NIOSocketWR.class);
+
private SelectionKey processKey;
private static final int OP_NOT_READ = ~SelectionKey.OP_READ;
private static final int OP_NOT_WRITE = ~SelectionKey.OP_WRITE;
diff --git a/src/main/java/com/actiontech/dble/net/factory/BackendConnectionFactory.java b/src/main/java/com/actiontech/dble/net/factory/BackendConnectionFactory.java
deleted file mode 100644
index d3e1480cb..000000000
--- a/src/main/java/com/actiontech/dble/net/factory/BackendConnectionFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-* Copyright (C) 2016-2020 ActionTech.
-* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
-* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
-*/
-package com.actiontech.dble.net.factory;
-
-import com.actiontech.dble.DbleServer;
-
-import java.io.IOException;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.NetworkChannel;
-import java.nio.channels.SocketChannel;
-
-/**
- * @author mycat
- */
-public abstract class BackendConnectionFactory {
-
- protected NetworkChannel openSocketChannel(boolean isAIO)
- throws IOException {
- if (isAIO) {
- return AsynchronousSocketChannel.open(DbleServer.getInstance().getNextAsyncChannelGroup());
- } else {
- SocketChannel channel = null;
- channel = SocketChannel.open();
- channel.configureBlocking(false);
- return channel;
- }
-
- }
-
-}
diff --git a/src/main/java/com/actiontech/dble/net/handler/ServerUserAuthenticator.java b/src/main/java/com/actiontech/dble/net/handler/ServerUserAuthenticator.java
index 61f9a8ddd..ad321ac9b 100644
--- a/src/main/java/com/actiontech/dble/net/handler/ServerUserAuthenticator.java
+++ b/src/main/java/com/actiontech/dble/net/handler/ServerUserAuthenticator.java
@@ -49,7 +49,7 @@ public class ServerUserAuthenticator extends FrontendAuthenticator {
sc.setSchema(auth.getDatabase());
sc.initCharsetIndex(auth.getCharsetIndex());
sc.setHandler(new ShardingUserCommandHandler(sc));
- sc.setMultStatementAllow(auth.isMultStatementAllow());
+ sc.setMultiStatementAllow(auth.isMultStatementAllow());
sc.setClientFlags(auth.getClientFlags());
boolean clientCompress = Capabilities.CLIENT_COMPRESS == (Capabilities.CLIENT_COMPRESS & auth.getClientFlags());
boolean usingCompress = SystemConfig.getInstance().getUseCompression() == 1;
diff --git a/src/main/java/com/actiontech/dble/route/util/RouterUtil.java b/src/main/java/com/actiontech/dble/route/util/RouterUtil.java
index 364a09f51..c212c312d 100644
--- a/src/main/java/com/actiontech/dble/route/util/RouterUtil.java
+++ b/src/main/java/com/actiontech/dble/route/util/RouterUtil.java
@@ -339,7 +339,7 @@ public final class RouterUtil {
ArrayList x = new ArrayList<>(shardingNodes);
Map shardingNodeMap = DbleServer.getInstance().getConfig().getShardingNodes();
while (x.size() > 1) {
- if (shardingNodeMap.get(x.get(index)).getDbGroup().getWriteSource().isAlive()) {
+ if (shardingNodeMap.get(x.get(index)).getDbGroup().getWriteDbInstance().isAlive()) {
return x.get(index);
}
x.remove(index);
diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
index 163bf618a..eb993e0c3 100644
--- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
+++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
@@ -758,22 +758,17 @@ public class NonBlockingSession implements Session {
public void releaseConnection(RouteResultsetNode rrn, boolean debug, final boolean needClose) {
if (rrn != null) {
BackendConnection c = target.remove(rrn);
- if (c != null) {
- if (debug) {
- LOGGER.debug("release connection " + c);
+ if (c != null && !c.isClosed()) {
+ if (source.isFlowControlled()) {
+ releaseConnectionFromFlowCntrolled(c);
}
- if (!c.isClosed()) {
- if (source.isFlowControlled()) {
- releaseConnectionFromFlowCntrolled(c);
- }
- if (c.isAutocommit()) {
- c.release();
- } else if (needClose) {
- //c.rollback();
- c.close("the need to be closed");
- } else {
- c.release();
- }
+ if (c.isAutocommit()) {
+ c.release();
+ } else if (needClose) {
+ //c.rollback();
+ c.close("the need to be closed");
+ } else {
+ c.release();
}
}
}
@@ -786,13 +781,9 @@ public class NonBlockingSession implements Session {
if (theCon == con) {
iterator.remove();
con.release();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("release connection " + con);
- }
break;
}
}
-
}
public void waitFinishConnection(RouteResultsetNode rrn) {
@@ -862,14 +853,11 @@ public class NonBlockingSession implements Session {
}
for (Entry en : toKilled.entrySet()) {
- KillConnectionHandler kill = new KillConnectionHandler(
- en.getValue(), this);
+ KillConnectionHandler kill = new KillConnectionHandler(en.getValue(), this);
ServerConfig conf = DbleServer.getInstance().getConfig();
- ShardingNode dn = conf.getShardingNodes().get(
- en.getKey().getName());
+ ShardingNode dn = conf.getShardingNodes().get(en.getKey().getName());
try {
- dn.getConnectionFromSameSource(en.getValue().getSchema(), true, en.getValue(),
- kill, en.getKey());
+ dn.getConnectionFromSameSource(en.getValue().getSchema(), en.getValue(), kill, en.getKey());
} catch (Exception e) {
LOGGER.info("get killer connection failed for " + en.getKey(), e);
kill.connectionError(e, null);
@@ -915,7 +903,7 @@ public class NonBlockingSession implements Session {
ServerConfig conf = DbleServer.getInstance().getConfig();
ShardingNode dn = conf.getShardingNodes().get(node.getName());
try {
- MySQLConnection newConn = (MySQLConnection) dn.getConnection(dn.getDatabase(), errConn.isAutocommit(), false, errConn.getAttachment());
+ MySQLConnection newConn = (MySQLConnection) dn.getConnection(dn.getDatabase(), false, errConn.getAttachment());
newConn.setXaStatus(errConn.getXaStatus());
newConn.setSession(this);
if (!newConn.setResponseHandler(queryHandler)) {
diff --git a/src/main/java/com/actiontech/dble/server/ServerConnection.java b/src/main/java/com/actiontech/dble/server/ServerConnection.java
index d9543710e..79934e3e4 100644
--- a/src/main/java/com/actiontech/dble/server/ServerConnection.java
+++ b/src/main/java/com/actiontech/dble/server/ServerConnection.java
@@ -79,11 +79,10 @@ public class ServerConnection extends FrontendConnection {
private FrontendPrepareHandler prepareHandler;
private LoadDataInfileHandler loadDataInfileHandler;
private boolean sessionReadOnly = false;
- private volatile boolean multStatementAllow = false;
+ private volatile boolean multiStatementAllow = false;
private ServerUserConfig userConfig;
- public ServerConnection(NetworkChannel channel)
- throws IOException {
+ public ServerConnection(NetworkChannel channel) throws IOException {
super(channel);
this.handler = new ServerUserAuthenticator(this);
@@ -121,13 +120,11 @@ public class ServerConnection extends FrontendConnection {
this.userConfig = userConfig;
}
- @Override
public boolean isIdleTimeout() {
if (isAuthenticated) {
- return super.isIdleTimeout();
+ return TimeUtil.currentTimeMillis() > Math.max(lastWriteTime, lastReadTime) + idleTimeout;
} else {
- return TimeUtil.currentTimeMillis() > Math.max(lastWriteTime,
- lastReadTime) + AUTH_TIMEOUT;
+ return TimeUtil.currentTimeMillis() > Math.max(lastWriteTime, lastReadTime) + AUTH_TIMEOUT;
}
}
@@ -216,15 +213,14 @@ public class ServerConnection extends FrontendConnection {
this.loadDataInfileHandler = loadDataInfileHandler;
}
- public boolean isMultStatementAllow() {
- return multStatementAllow;
+ public boolean isMultiStatementAllow() {
+ return multiStatementAllow;
}
- public void setMultStatementAllow(boolean multStatementAllow) {
- this.multStatementAllow = multStatementAllow;
+ public void setMultiStatementAllow(boolean multiStatementAllow) {
+ this.multiStatementAllow = multiStatementAllow;
}
-
public void setPrepareHandler(FrontendPrepareHandler prepareHandler) {
this.prepareHandler = prepareHandler;
}
@@ -524,7 +520,6 @@ public class ServerConnection extends FrontendConnection {
}
-
public void stmtPrepare(byte[] data) {
if (prepareHandler != null) {
MySQLMessage mm = new MySQLMessage(data);
@@ -566,11 +561,11 @@ public class ServerConnection extends FrontendConnection {
mm.position(5);
int optCommand = mm.readUB2();
if (optCommand == 0) {
- this.multStatementAllow = true;
+ this.multiStatementAllow = true;
write(writeToBuffer(EOFPacket.EOF, allocate()));
return;
} else if (optCommand == 1) {
- this.multStatementAllow = false;
+ this.multiStatementAllow = false;
write(writeToBuffer(EOFPacket.EOF, allocate()));
return;
}
@@ -844,6 +839,9 @@ public class ServerConnection extends FrontendConnection {
@Override
public synchronized void close(String reason) {
+ if (isClosed) {
+ return;
+ }
super.close(reason);
if (session != null) {
TsQueriesCounter.getInstance().addToHistory(session);
diff --git a/src/main/java/com/actiontech/dble/server/ServerQueryHandler.java b/src/main/java/com/actiontech/dble/server/ServerQueryHandler.java
index e83708de0..8f8bd2974 100644
--- a/src/main/java/com/actiontech/dble/server/ServerQueryHandler.java
+++ b/src/main/java/com/actiontech/dble/server/ServerQueryHandler.java
@@ -56,7 +56,7 @@ public class ServerQueryHandler implements FrontendQueryHandler {
sql = source.getSession2().getRemingSql();
}
//Preliminary judgment of multi statement
- if (source.isMultStatementAllow() && source.getSession2().generalNextStatement(sql)) {
+ if (source.isMultiStatementAllow() && source.getSession2().generalNextStatement(sql)) {
sql = sql.substring(0, ParseUtil.findNextBreak(sql));
}
source.setExecuteSql(sql);
diff --git a/src/main/java/com/actiontech/dble/server/variables/VarsExtractorHandler.java b/src/main/java/com/actiontech/dble/server/variables/VarsExtractorHandler.java
index 4a4b86c02..6ba33629e 100644
--- a/src/main/java/com/actiontech/dble/server/variables/VarsExtractorHandler.java
+++ b/src/main/java/com/actiontech/dble/server/variables/VarsExtractorHandler.java
@@ -28,7 +28,7 @@ public class VarsExtractorHandler {
private Condition done;
private Map dbGroups;
private volatile SystemVariables systemVariables = null;
- private PhysicalDbInstance usedDataource = null;
+ private PhysicalDbInstance usedDbInstance = null;
public VarsExtractorHandler(Map dbGroups) {
this.dbGroups = dbGroups;
@@ -40,7 +40,7 @@ public class VarsExtractorHandler {
public SystemVariables execute() {
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(MYSQL_SHOW_VARIABLES_COLS, new MysqlVarsListener(this));
PhysicalDbInstance ds = getPhysicalDbInstance();
- this.usedDataource = ds;
+ this.usedDbInstance = ds;
if (ds != null) {
OneTimeConnJob sqlJob = new OneTimeConnJob(MYSQL_SHOW_VARIABLES, null, resultHandler, ds);
sqlJob.run();
@@ -54,7 +54,7 @@ public class VarsExtractorHandler {
private PhysicalDbInstance getPhysicalDbInstance() {
PhysicalDbInstance ds = null;
for (PhysicalDbGroup dbGroup : dbGroups.values()) {
- PhysicalDbInstance dsTest = dbGroup.getWriteSource();
+ PhysicalDbInstance dsTest = dbGroup.getWriteDbInstance();
if (dsTest.isTestConnSuccess()) {
ds = dsTest;
}
@@ -112,11 +112,11 @@ public class VarsExtractorHandler {
}
}
- public PhysicalDbInstance getUsedDataource() {
- return usedDataource;
+ public PhysicalDbInstance getUsedDbInstance() {
+ return usedDbInstance;
}
- public void setUsedDataource(PhysicalDbInstance usedDataource) {
- this.usedDataource = usedDataource;
+ public void setUsedDbInstance(PhysicalDbInstance usedDbInstance) {
+ this.usedDbInstance = usedDbInstance;
}
}
diff --git a/src/main/java/com/actiontech/dble/singleton/Scheduler.java b/src/main/java/com/actiontech/dble/singleton/Scheduler.java
index feb3a4632..08b317016 100644
--- a/src/main/java/com/actiontech/dble/singleton/Scheduler.java
+++ b/src/main/java/com/actiontech/dble/singleton/Scheduler.java
@@ -2,8 +2,8 @@ package com.actiontech.dble.singleton;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.BackendConnection;
-import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.mysql.xa.XAStateLog;
+import com.actiontech.dble.backend.pool.PooledEntry;
import com.actiontech.dble.buffer.BufferPool;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.user.UserName;
@@ -36,26 +36,26 @@ public final class Scheduler {
private static final long DEFAULT_OLD_CONNECTION_CLEAR_PERIOD = 5 * 1000L;
private static final long DEFAULT_SQL_STAT_RECYCLE_PERIOD = 5 * 1000L;
private ExecutorService timerExecutor;
+ private ScheduledExecutorService scheduledExecutor;
+
+ private Scheduler() {
+ this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TimerScheduler-%d").build());
+ }
public void init(ExecutorService executor) {
this.timerExecutor = executor;
- ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TimerScheduler-%d").build());
- long shardingNodeIdleCheckPeriod = SystemConfig.getInstance().getShardingNodeIdleCheckPeriod();
- scheduler.scheduleAtFixedRate(updateTime(), 0L, TIME_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
- scheduler.scheduleWithFixedDelay(DbleServer.getInstance().processorCheck(), 0L, SystemConfig.getInstance().getProcessorCheckPeriod(), TimeUnit.MILLISECONDS);
- scheduler.scheduleAtFixedRate(shardingNodeConHeartBeatCheck(shardingNodeIdleCheckPeriod), 0L, shardingNodeIdleCheckPeriod, TimeUnit.MILLISECONDS);
- //dbGroup heartBeat will be influence by dbGroupWithoutWR
- scheduler.scheduleAtFixedRate(dbInstanceHeartbeat(), 0L, SystemConfig.getInstance().getShardingNodeHeartbeatPeriod(), TimeUnit.MILLISECONDS);
- scheduler.scheduleAtFixedRate(dbInstanceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
- scheduler.scheduleWithFixedDelay(xaSessionCheck(), 0L, SystemConfig.getInstance().getXaSessionCheckPeriod(), TimeUnit.MILLISECONDS);
- scheduler.scheduleWithFixedDelay(xaLogClean(), 0L, SystemConfig.getInstance().getXaLogCleanPeriod(), TimeUnit.MILLISECONDS);
- scheduler.scheduleWithFixedDelay(resultSetMapClear(), 0L, SystemConfig.getInstance().getClearBigSQLResultSetMapMs(), TimeUnit.MILLISECONDS);
+ scheduledExecutor.scheduleAtFixedRate(updateTime(), 0L, TIME_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
+ scheduledExecutor.scheduleWithFixedDelay(DbleServer.getInstance().processorCheck(), 0L, SystemConfig.getInstance().getProcessorCheckPeriod(), TimeUnit.MILLISECONDS);
+ scheduledExecutor.scheduleAtFixedRate(dbInstanceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
+ scheduledExecutor.scheduleWithFixedDelay(xaSessionCheck(), 0L, SystemConfig.getInstance().getXaSessionCheckPeriod(), TimeUnit.MILLISECONDS);
+ scheduledExecutor.scheduleWithFixedDelay(xaLogClean(), 0L, SystemConfig.getInstance().getXaLogCleanPeriod(), TimeUnit.MILLISECONDS);
+ scheduledExecutor.scheduleWithFixedDelay(resultSetMapClear(), 0L, SystemConfig.getInstance().getClearBigSQLResultSetMapMs(), TimeUnit.MILLISECONDS);
if (SystemConfig.getInstance().getUseSqlStat() == 1) {
//sql record detail timing clean
- scheduler.scheduleWithFixedDelay(recycleSqlStat(), 0L, DEFAULT_SQL_STAT_RECYCLE_PERIOD, TimeUnit.MILLISECONDS);
+ scheduledExecutor.scheduleWithFixedDelay(recycleSqlStat(), 0L, DEFAULT_SQL_STAT_RECYCLE_PERIOD, TimeUnit.MILLISECONDS);
}
- scheduler.scheduleAtFixedRate(threadStatRenew(), 0L, 1, TimeUnit.SECONDS);
- scheduler.scheduleAtFixedRate(printLongTimeDDL(), 0L, DDL_EXECUTE_CHECK_PERIOD, TimeUnit.SECONDS);
+ scheduledExecutor.scheduleAtFixedRate(threadStatRenew(), 0L, 1, TimeUnit.SECONDS);
+ scheduledExecutor.scheduleAtFixedRate(printLongTimeDDL(), 0L, DDL_EXECUTE_CHECK_PERIOD, TimeUnit.SECONDS);
}
private Runnable printLongTimeDDL() {
@@ -76,45 +76,6 @@ public final class Scheduler {
};
}
- private Runnable shardingNodeConHeartBeatCheck(final long heartPeriod) {
- return new Runnable() {
- @Override
- public void run() {
- timerExecutor.execute(new Runnable() {
- @Override
- public void run() {
-
- Map nodes = DbleServer.getInstance().getConfig().getDbGroups();
- for (PhysicalDbGroup node : nodes.values()) {
- node.heartbeatCheck(heartPeriod);
- }
- }
- });
- }
- };
- }
-
- // heartbeat for dbInstance
- private Runnable dbInstanceHeartbeat() {
- return new Runnable() {
- @Override
- public void run() {
- timerExecutor.execute(new Runnable() {
- @Override
- public void run() {
- if (DbleServer.getInstance().getConfig().isFullyConfigured()) {
- Map hosts = DbleServer.getInstance().getConfig().getDbGroups();
- for (PhysicalDbGroup host : hosts.values()) {
- host.doHeartbeat();
- }
- }
- }
- });
- }
- };
- }
-
-
/**
* after reload @@config_all ,clean old connection
*/
@@ -133,7 +94,7 @@ public final class Scheduler {
while (iterator.hasNext()) {
BackendConnection con = iterator.next();
long lastTime = con.getLastTime();
- if (con.isClosed() || !con.isBorrowed() || currentTime - lastTime > sqlTimeout) {
+ if (con.isClosed() || con.getState() != PooledEntry.STATE_IN_USE || currentTime - lastTime > sqlTimeout) {
con.close("clear old backend connection ...");
iterator.remove();
}
@@ -239,6 +200,9 @@ public final class Scheduler {
return timerExecutor;
}
+ public ScheduledExecutorService getScheduledExecutor() {
+ return scheduledExecutor;
+ }
public static Scheduler getInstance() {
return INSTANCE;
diff --git a/src/main/java/com/actiontech/dble/sqlengine/MultiTablesMetaJob.java b/src/main/java/com/actiontech/dble/sqlengine/MultiTablesMetaJob.java
index 63ba816c8..20dfa2745 100644
--- a/src/main/java/com/actiontech/dble/sqlengine/MultiTablesMetaJob.java
+++ b/src/main/java/com/actiontech/dble/sqlengine/MultiTablesMetaJob.java
@@ -66,7 +66,7 @@ public class MultiTablesMetaJob implements ResponseHandler, Runnable {
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(node.getName());
dn.getConnection(dn.getDatabase(), isMustWriteNode, true, node, this, node);
} else {
- ds.getConnection(schema, true, this, null, false);
+ ds.getConnection(schema, this, null, false);
}
} catch (Exception e) {
logger.warn("can't get connection" + shardingNode, e);
diff --git a/src/main/java/com/actiontech/dble/sqlengine/OneTimeConnJob.java b/src/main/java/com/actiontech/dble/sqlengine/OneTimeConnJob.java
index 587b6885b..0659e7136 100644
--- a/src/main/java/com/actiontech/dble/sqlengine/OneTimeConnJob.java
+++ b/src/main/java/com/actiontech/dble/sqlengine/OneTimeConnJob.java
@@ -34,7 +34,7 @@ public class OneTimeConnJob extends SQLJob {
public void run() {
try {
- ds.getNewConnection(schema, this, null, false, true);
+ ds.createConnectionSkipPool(schema, this);
} catch (Exception e) {
this.connectionError(e, null);
}
diff --git a/src/main/java/com/actiontech/dble/sqlengine/SQLJob.java b/src/main/java/com/actiontech/dble/sqlengine/SQLJob.java
index 9a5301b26..eee74e452 100644
--- a/src/main/java/com/actiontech/dble/sqlengine/SQLJob.java
+++ b/src/main/java/com/actiontech/dble/sqlengine/SQLJob.java
@@ -69,7 +69,7 @@ public class SQLJob implements ResponseHandler, Runnable, Cloneable {
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(node.getName());
dn.getConnection(dn.getDatabase(), isMustWriteNode, true, node, this, node);
} else {
- ds.getConnection(schema, true, this, null, isMustWriteNode);
+ ds.getConnection(schema, this, null, isMustWriteNode);
}
} catch (Exception e) {
LOGGER.warn("can't get connection", e);
diff --git a/src/main/java/com/actiontech/dble/sqlengine/SetTestJob.java b/src/main/java/com/actiontech/dble/sqlengine/SetTestJob.java
index 7a081921b..365a731f3 100644
--- a/src/main/java/com/actiontech/dble/sqlengine/SetTestJob.java
+++ b/src/main/java/com/actiontech/dble/sqlengine/SetTestJob.java
@@ -47,8 +47,8 @@ public class SetTestJob implements ResponseHandler, Runnable {
try {
Map dbGroups = DbleServer.getInstance().getConfig().getDbGroups();
for (PhysicalDbGroup dbGroup : dbGroups.values()) {
- if (dbGroup.getWriteSource().isAlive()) {
- dbGroup.getWriteSource().getConnection(databaseName, true, this, null, false);
+ if (dbGroup.getWriteDbInstance().isAlive()) {
+ dbGroup.getWriteDbInstance().getConnection(databaseName, this, null, false);
sendTest = true;
break;
}
diff --git a/src/main/java/com/actiontech/dble/sqlengine/SpecialSqlJob.java b/src/main/java/com/actiontech/dble/sqlengine/SpecialSqlJob.java
index e0df1c6af..c554f1478 100644
--- a/src/main/java/com/actiontech/dble/sqlengine/SpecialSqlJob.java
+++ b/src/main/java/com/actiontech/dble/sqlengine/SpecialSqlJob.java
@@ -53,7 +53,7 @@ public class SpecialSqlJob extends SQLJob {
@Override
public void run() {
try {
- ds.getConnection(schema, true, sqlJob, null, false);
+ ds.getConnection(schema, sqlJob, null, false);
} catch (Exception e) {
sqlJob.connectionError(e, null);
}
diff --git a/src/main/java/com/actiontech/dble/sqlengine/TransformSQLJob.java b/src/main/java/com/actiontech/dble/sqlengine/TransformSQLJob.java
index 42df51943..deac7fabe 100644
--- a/src/main/java/com/actiontech/dble/sqlengine/TransformSQLJob.java
+++ b/src/main/java/com/actiontech/dble/sqlengine/TransformSQLJob.java
@@ -49,7 +49,7 @@ public class TransformSQLJob implements ResponseHandler, Runnable {
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(node.getName());
dn.getConnection(dn.getDatabase(), false, true, node, this, node);
} else {
- ds.getConnection(databaseName, true, this, null, false);
+ ds.getConnection(databaseName, this, null, false);
}
} catch (Exception e) {
LOGGER.warn("can't get connection", e);