fix: reselect when dbGroup fails (#3114)

* fix: reselect when dbGroup fails

fix: set volatile, remove cas

fix: remove Unused import

fix: findbugs

# Conflicts:
#	src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java

* docs: add notes
This commit is contained in:
LUA
2022-01-26 11:13:49 +08:00
committed by GitHub
parent a754d72fa4
commit 5378bdda6a
8 changed files with 149 additions and 39 deletions
@@ -20,9 +20,12 @@ import com.actiontech.dble.config.helper.KeyVariables;
import com.actiontech.dble.config.model.db.DbGroupConfig;
import com.actiontech.dble.config.model.db.DbInstanceConfig;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.Session;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.PooledConnection;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.singleton.HaConfigManager;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
@@ -59,6 +62,13 @@ public class PhysicalDbGroup {
private boolean shardingUseless = true;
private boolean rwSplitUseless = true;
private Set<Session> rwSplitSessionSet = Sets.newConcurrentHashSet();
private volatile Integer state = Integer.valueOf(INITIAL);
public static final int STATE_DELETING = 2;
public static final int STATE_ABANDONED = 1;
public static final int INITIAL = 0;
public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance writeDbInstances, PhysicalDbInstance[] readDbInstances, int rwSplitMode) {
this.groupName = name;
@@ -169,7 +179,7 @@ public class PhysicalDbGroup {
}
}
public void init(List<String> sourceNames, String reason) {
public void startOfFresh(List<String> sourceNames, String reason) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).init(reason, false);
@@ -177,17 +187,39 @@ public class PhysicalDbGroup {
}
}
private boolean checkState() {
if (getBindingCount() != 0) {
state = STATE_DELETING;
IOProcessor.BACKENDS_OLD_GROUP.add(this);
return false;
}
if (state.intValue() != INITIAL) {
return false;
}
if (getBindingCount() != 0) {
state = STATE_DELETING;
IOProcessor.BACKENDS_OLD_GROUP.add(this);
return false;
}
state = STATE_ABANDONED;
return true;
}
public void stop(String reason) {
stop(reason, false);
}
public void stop(String reason, boolean closeFront) {
boolean flag = checkState();
if (!flag) {
return;
}
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, closeFront);
}
}
public void stop(List<String> sourceNames, String reason, boolean closeFront) {
public void stopOfFresh(List<String> sourceNames, String reason, boolean closeFront) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).stop(reason, closeFront, false);
@@ -209,6 +241,22 @@ public class PhysicalDbGroup {
}
}
public boolean stopOfBackground(String reason) {
if (state.intValue() == STATE_DELETING && getBindingCount() == 0) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, false);
}
return true;
}
return false;
}
public boolean isStop() {
return state.intValue() != INITIAL;
}
public Collection<PhysicalDbInstance> getDbInstances(boolean isAll) {
if (!isAll && rwSplitMode == RW_SPLIT_OFF) {
return writeInstanceList;
@@ -232,22 +280,19 @@ public class PhysicalDbGroup {
return readSources;
}
public PhysicalDbInstance rwSelect(Boolean master, boolean write) throws IOException {
return select(master, false, write);
}
/**
* rwsplit user
*
* @param master
* @param write
* @return
* @throws IOException
*/
public PhysicalDbInstance select(Boolean master) throws IOException {
if (Objects.nonNull(master)) {
return select(master, false, master);
public PhysicalDbInstance rwSelect(Boolean master, Boolean write) throws IOException {
if (Objects.nonNull(write)) {
return select(master, false, write);
}
return select(master, false, false);
return select(master, false, Objects.nonNull(master) ? master : false);
}
/**
@@ -540,6 +585,18 @@ public class PhysicalDbGroup {
}
}
public int getBindingCount() {
return rwSplitSessionSet.size();
}
public boolean bindRwSplitSession(RWSplitNonBlockingSession session) {
return this.rwSplitSessionSet.add(session);
}
public boolean unBindRwSplitSession(RWSplitNonBlockingSession session) {
return this.rwSplitSessionSet.remove(session);
}
public boolean equalsBaseInfo(PhysicalDbGroup pool) {
return pool.getDbGroupConfig().getName().equals(this.dbGroupConfig.getName()) &&
pool.getDbGroupConfig().getHeartbeatSQL().equals(this.dbGroupConfig.getHeartbeatSQL()) &&
@@ -6,6 +6,7 @@
package com.actiontech.dble.net;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.stage.XAStage;
import com.actiontech.dble.backend.mysql.xa.TxState;
import com.actiontech.dble.buffer.BufferPool;
@@ -47,6 +48,7 @@ public final class IOProcessor {
// after reload @@config_all ,old back ends connections stored in backends_old
public static final ConcurrentLinkedQueue<PooledConnection> BACKENDS_OLD = new ConcurrentLinkedQueue<>();
public static final ConcurrentLinkedQueue<PhysicalDbGroup> BACKENDS_OLD_GROUP = new ConcurrentLinkedQueue<>();
private AtomicInteger frontEndsLength = new AtomicInteger(0);
@@ -70,7 +70,7 @@ public final class HintMasterDBHandler {
String dbGroup = rwSplitUserConfig.getDbGroup();
PhysicalDbInstance dbInstance;
try {
dbInstance = DbleServer.getInstance().getConfig().getDbGroups().get(dbGroup).select(isRouteToMaster);
dbInstance = DbleServer.getInstance().getConfig().getDbGroups().get(dbGroup).rwSelect(isRouteToMaster, null);
} catch (IOException e) {
throw new SQLNonTransientException(e);
}
@@ -5,10 +5,12 @@
package com.actiontech.dble.rwsplit;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.util.ConfigException;
import com.actiontech.dble.net.Session;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.FrontendConnection;
@@ -36,6 +38,7 @@ public class RWSplitNonBlockingSession extends Session {
private volatile boolean preSendIsWrite = false; // Has the previous SQL been delivered to the write node?
private volatile long preWriteResponseTime = 0; // Response time of the previous write node
private int reSelectNum;
public RWSplitNonBlockingSession(RWSplitService service) {
this.rwSplitService = service;
@@ -58,27 +61,7 @@ public class RWSplitNonBlockingSession extends Session {
try {
RWSplitHandler handler = getRwSplitHandler(originPacket, callback);
if (handler == null) return;
Boolean isMaster = canRunOnMaster(master); // first
boolean firstValue = isMaster == null ? false : isMaster;
long rwStickyTime = SystemConfig.getInstance().getRwStickyTime();
if ((rwStickyTime > 0) && !firstValue) {
if (this.getPreWriteResponseTime() > 0 && System.currentTimeMillis() - this.getPreWriteResponseTime() <= rwStickyTime) {
isMaster = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("because in the sticky time rangeso select write instance");
}
} else {
resetLastSqlResponseTime();
}
}
PhysicalDbInstance instance = rwGroup.select(isMaster); // second
boolean isWrite = !instance.isReadInstance();
this.setPreSendIsWrite(isWrite && firstValue); // ensure that the first and second results are write instances
checkDest(isWrite);
instance.getConnection(rwSplitService.getSchema(), handler, null, false);
} catch (IOException e) {
LOGGER.warn("select conn error", e);
rwSplitService.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, e.getMessage());
getConnection(handler, master, null);
} catch (SQLSyntaxErrorException se) {
rwSplitService.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, se.getMessage());
}
@@ -88,6 +71,14 @@ public class RWSplitNonBlockingSession extends Session {
try {
RWSplitHandler handler = getRwSplitHandler(originPacket, callback);
if (handler == null) return;
getConnection(handler, master, isWrite(write));
} catch (SQLSyntaxErrorException se) {
rwSplitService.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, se.getMessage());
}
}
public void getConnection(RWSplitHandler handler, Boolean master, Boolean write) {
try {
Boolean isMaster = canRunOnMaster(master); // first
boolean firstValue = isMaster == null ? false : isMaster;
long rwStickyTime = SystemConfig.getInstance().getRwStickyTime();
@@ -101,14 +92,21 @@ public class RWSplitNonBlockingSession extends Session {
resetLastSqlResponseTime();
}
}
PhysicalDbInstance instance = rwGroup.rwSelect(canRunOnMaster(isMaster), isWrite(write));
checkDest(!instance.isReadInstance());
PhysicalDbInstance instance = reSelectRWDbGroup(rwGroup).rwSelect(isMaster, write); // second
boolean isWrite = !instance.isReadInstance();
this.setPreSendIsWrite(isWrite && firstValue); // ensure that the first and second results are write instances
checkDest(isWrite);
instance.getConnection(rwSplitService.getSchema(), handler, null, false);
} catch (SQLSyntaxErrorException se) {
rwGroup.unBindRwSplitSession(this);
rwSplitService.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, se.getMessage());
} catch (IOException e) {
LOGGER.warn("select conn error", e);
rwGroup.unBindRwSplitSession(this);
rwSplitService.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, e.getMessage());
} catch (SQLSyntaxErrorException se) {
rwSplitService.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, se.getMessage());
} catch (Exception | Error e) {
rwGroup.unBindRwSplitSession(this);
throw e;
}
}
@@ -154,6 +152,30 @@ public class RWSplitNonBlockingSession extends Session {
throw new SQLSyntaxErrorException("unexpected dble_dest_expect,real[" + (isMaster ? "M" : "S") + "],expect[" + dest + "]");
}
public PhysicalDbGroup reSelectRWDbGroup(PhysicalDbGroup dbGroup) {
dbGroup.bindRwSplitSession(this);
if (dbGroup.isStop()) {
dbGroup.unBindRwSplitSession(this);
if (reSelectNum == 10) {
reSelectNum = 0;
LOGGER.warn("dbGroup`{}` is always invalid", rwSplitService.getUserConfig().getDbGroup());
throw new ConfigException("the dbGroup`" + rwSplitService.getUserConfig().getDbGroup() + "` is always invalid, pls check reason");
}
PhysicalDbGroup newDbGroup = DbleServer.getInstance().getConfig().getDbGroups().get(rwSplitService.getUserConfig().getDbGroup());
if (newDbGroup == null) {
LOGGER.warn("dbGroup`{}` is invalid", rwSplitService.getUserConfig().getDbGroup());
throw new ConfigException("the dbGroup`" + rwSplitService.getUserConfig().getDbGroup() + "` is invalid");
} else {
reSelectNum++;
return reSelectRWDbGroup(newDbGroup);
}
} else {
reSelectNum = 0;
this.rwGroup = dbGroup;
}
return dbGroup;
}
public PhysicalDbGroup getRwGroup() {
return rwGroup;
}
@@ -211,6 +233,9 @@ public class RWSplitNonBlockingSession extends Session {
}
public void close(String reason) {
if (null != rwGroup) {
rwGroup.unBindRwSplitSession(this);
}
if (conn != null) {
conn.close(reason);
}
@@ -56,8 +56,8 @@ public final class FreshBackendConn {
if (dh.getRwSplitMode() == PhysicalDbGroup.RW_SPLIT_OFF && sourceNames.size() > 1 && sourceNames.contains(dh.getWriteDbInstance().getName())) {
warnMsg = "the rwSplitMode of this dbGroup is 0, so connection pool for slave dbInstance don't refresh";
}
dh.stop(sourceNames, "fresh backend conn", isForced);
dh.init(sourceNames, "fresh backend conn");
dh.stopOfFresh(sourceNames, "fresh backend conn", isForced);
dh.startOfFresh(sourceNames, "fresh backend conn");
}
} catch (Exception e) {
service.writeErrMessage(ErrorCode.ER_YES, "fresh conn with error, use show @@backend to check latest status. Error:" + e.getMessage());
@@ -75,12 +75,18 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler,
@Override
public void connectionAcquired(final BackendConnection conn) {
if (null != rwSplitService.getSession().getRwGroup()) {
rwSplitService.getSession().getRwGroup().unBindRwSplitSession(rwSplitService.getSession());
}
rwSplitService.getSession().bind(conn);
execute(conn);
}
@Override
public void connectionError(Throwable e, Object attachment) {
if (null != rwSplitService.getSession().getRwGroup()) {
rwSplitService.getSession().getRwGroup().unBindRwSplitSession(rwSplitService.getSession());
}
StatisticListener.getInstance().record(rwSplitService, r -> r.onBackendSqlSetRowsAndEnd(0));
loadDataClean();
writeErrorMsg(rwSplitService.nextPacketId(), "can't connect to dbGroup[" + rwSplitService.getUserConfig().getDbGroup());
@@ -7,6 +7,7 @@ package com.actiontech.dble.singleton;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.XAAnalysisHandler;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.mysql.xa.XAStateLog;
import com.actiontech.dble.buffer.BufferPool;
import com.actiontech.dble.config.model.SystemConfig;
@@ -53,6 +54,7 @@ public final class Scheduler {
scheduledExecutor.scheduleAtFixedRate(updateTime(), 0L, TIME_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleWithFixedDelay(DbleServer.getInstance().processorCheck(), 0L, SystemConfig.getInstance().getProcessorCheckPeriod(), TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleAtFixedRate(dbInstanceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleAtFixedRate(oldDbGroupClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleWithFixedDelay(xaSessionCheck(), 0L, SystemConfig.getInstance().getXaSessionCheckPeriod(), TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleWithFixedDelay(xaLogClean(), 0L, SystemConfig.getInstance().getXaLogCleanPeriod(), TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleWithFixedDelay(resultSetMapClear(), 0L, SystemConfig.getInstance().getClearBigSQLResultSetMapMs(), TimeUnit.MILLISECONDS);
@@ -113,6 +115,24 @@ public final class Scheduler {
};
}
/**
* after reload @@config_all ,clean old connection
*/
private Runnable oldDbGroupClear() {
return () -> timerExecutor.execute(() -> {
Iterator<PhysicalDbGroup> iterator = IOProcessor.BACKENDS_OLD_GROUP.iterator();
while (iterator.hasNext()) {
PhysicalDbGroup dbGroup = iterator.next();
boolean isStop = dbGroup.stopOfBackground("[background task]reload config, recycle old group");
LOGGER.info("[background task]recycle old group`{}` result{}", dbGroup.getGroupName(), isStop);
if (isStop) {
iterator.remove();
}
}
});
}
// XA session check job
private Runnable xaSessionCheck() {
@@ -134,7 +134,7 @@ public class ConfigTest {
Map<String, PhysicalDbGroup> dbGroups = dbConverter.getDbGroupMap();
PhysicalDbGroup pool = dbGroups.get("localhost2");
PhysicalDbInstance source = pool.select(true);
PhysicalDbInstance source = pool.rwSelect(true, null);
Assert.assertNotNull(source);
}