transaction log

This commit is contained in:
yanhuqing666
2016-11-30 17:11:06 +08:00
parent 0414da37b0
commit 4dc558043b
17 changed files with 489 additions and 72 deletions

1
.gitignore vendored
View File

@@ -112,6 +112,7 @@ hs_err_pid*
.DS_Store
/target/
/tmlogs/
/txlog/
src/main/resources/server.xml
src/main/resources/schema.xml
src/main/resources/rule.xml

View File

@@ -71,6 +71,7 @@ import io.mycat.config.loader.zkprocess.comm.ZkParamCfg;
import io.mycat.config.model.SchemaConfig;
import io.mycat.config.model.SystemConfig;
import io.mycat.config.model.TableConfig;
import io.mycat.log.transaction.TxnLogProcessor;
import io.mycat.manager.ManagerConnectionFactory;
import io.mycat.memory.MyCatMemory;
import io.mycat.meta.MySQLTableStructureCheck;
@@ -118,6 +119,7 @@ public class MycatServer {
private final CacheService cacheService;
private Properties dnIndexProperties;
private ProxyMetaManager tmManager;
private TxnLogProcessor txnLogProcessor;
//AIO连接群组
private AsynchronousChannelGroup[] asyncChannelGroups;
@@ -168,7 +170,7 @@ public class MycatServer {
//SQL记录器
this.sqlRecorder = new SQLRecorder(config.getSystem().getSqlRecordCount());
/**
* 是否在线MyCat manager中有命令控制
* | offline | Change MyCat status to OFF |
@@ -411,6 +413,13 @@ public class MycatServer {
server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME
+ "Server", system.getBindIp(), system.getServerPort(), sf, reactorPool);
}
// start transaction SQL log
if (config.getSystem().isRecordTxn()) {
txnLogProcessor = new TxnLogProcessor(bufferPool);
txnLogProcessor.setName("TxnLogProcessor");
txnLogProcessor.start();
}
tmManager = new ProxyMetaManager();
tmManager.init();
@@ -684,6 +693,10 @@ public class MycatServer {
return (isUseZkSwitch&&"true".equalsIgnoreCase(loadZk)) ;
}
public TxnLogProcessor getTxnLogProcessor() {
return txnLogProcessor;
}
public RouteService getRouterService() {
return routerService;
}

View File

@@ -45,6 +45,7 @@ import io.mycat.backend.mysql.nio.handler.TransactionHandler.TxOperation;
import io.mycat.cache.LayerCachePool;
import io.mycat.config.ErrorCode;
import io.mycat.config.MycatConfig;
import io.mycat.log.transaction.TxnLogHelper;
import io.mycat.memory.unsafe.row.UnsafeRow;
import io.mycat.net.mysql.BinaryRowDataPacket;
import io.mycat.net.mysql.ErrorPacket;
@@ -168,7 +169,11 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
MycatConfig conf = MycatServer.getInstance().getConfig();
startTime = System.currentTimeMillis();
LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave());
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
if(!autocommit||session.getSource().isTxstart()||node.isModifySQL()){
sb.append("["+node.getName()+"]"+node.getStatement()).append(";\n");
}
BackendConnection conn = session.getTarget(node);
if (session.tryExistsCon(conn, node)) {
LOGGER.debug("node.getRunOnSlave()-" + node.getRunOnSlave());
@@ -190,6 +195,9 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
}
if(sb.length()>0){
TxnLogHelper.putTxnLog(session.getSource(), sb.toString());
}
}
private void _execute(BackendConnection conn, RouteResultsetNode node) {

View File

@@ -40,6 +40,7 @@ import io.mycat.backend.mysql.LoadDataUtil;
import io.mycat.config.ErrorCode;
import io.mycat.config.MycatConfig;
import io.mycat.config.model.SchemaConfig;
import io.mycat.log.transaction.TxnLogHelper;
import io.mycat.net.mysql.BinaryRowDataPacket;
import io.mycat.net.mysql.ErrorPacket;
import io.mycat.net.mysql.FieldPacket;
@@ -153,7 +154,11 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
}
conn.setResponseHandler(this);
try {
conn.execute(node, session.getSource(), session.getSource().isAutocommit()&&!session.getSource().isTxstart());
boolean isAutocommit = session.getSource().isAutocommit()&&!session.getSource().isTxstart();
if(!isAutocommit){
TxnLogHelper.putTxnLog(session.getSource(), node.getStatement());
}
conn.execute(node, session.getSource(), isAutocommit);
} catch (Exception e1) {
executeException(conn, e1);
return;

View File

@@ -30,6 +30,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.collect.Iterables;
import io.mycat.backend.datasource.PhysicalDBPool;
/**
@@ -46,7 +47,7 @@ public class DataHostConfig {
private static final Pattern pattern = Pattern.compile("\\s*show\\s+slave\\s+status\\s*",Pattern.CASE_INSENSITIVE);
private static final Pattern patternCluster = Pattern.compile("\\s*show\\s+status\\s+like\\s+'wsrep%'",Pattern.CASE_INSENSITIVE);
private String name;
private int maxCon = SystemConfig.DEFAULT_POOL_SIZE;
private int maxCon = 128;// 保持后端数据通道的默认最大值
private int minCon = 10;
private int balance = PhysicalDBPool.BALANCE_NONE;
private int writeType = PhysicalDBPool.WRITE_ONLYONE_NODE;

View File

@@ -36,31 +36,33 @@ import io.mycat.config.Isolations;
public final class SystemConfig {
public static final String SYS_HOME = "MYCAT_HOME";
public static final long DEFAULT_IDLE_TIMEOUT = 30 * 60 * 1000L;
public static final int SEQUENCEHANDLER_LOCALFILE = 0;
public static final int SEQUENCEHANDLER_MYSQLDB = 1;
public static final int SEQUENCEHANDLER_LOCAL_TIME = 2;
public static final int SEQUENCEHANDLER_ZK_DISTRIBUTED = 3;
public static final int SEQUENCEHANDLER_ZK_GLOBAL_INCREMENT = 4;
/*
* 注意!!! 目前mycat支持的MySQL版本如果后续有新的MySQL版本,请添加到此数组, 对于MySQL的其他分支
* 比如MariaDB目前版本号已经到10.1.x但是其驱动程序仍然兼容官方的MySQL,因此这里版本号只需要MySQL官方的版本号即可。
*/
public static final String[] MySQLVersions = { "5.5", "5.6", "5.7" };
public static final int MUTINODELIMIT_PATCH_SIZE = 100;
public static final int MUTINODELIMIT_SMALL_DATA = 0;
public static final int MUTINODELIMIT_LAR_DATA = 1;
private static final int DEFAULT_PORT = 8066;
private static final int DEFAULT_MANAGER_PORT = 9066;
private static final String DEFAULT_CHARSET = "utf8";
private static final String DEFAULT_SQL_PARSER = "druidparser";// fdbparser, druidparser
private static final short DEFAULT_BUFFER_CHUNK_SIZE = 4096;
private static final int DEFAULT_BUFFER_POOL_PAGE_SIZE = 512*1024*4;
private static final short DEFAULT_BUFFER_POOL_PAGE_NUMBER = 64;
private int processorBufferLocalPercent;
private static final int DEFAULT_PROCESSORS = Runtime.getRuntime().availableProcessors();
private int frontSocketSoRcvbuf = 1024 * 1024;
private int frontSocketSoSndbuf = 4 * 1024 * 1024;
private int backSocketSoRcvbuf = 4 * 1024 * 1024;// mysql 5.6
// net_buffer_length
// defaut 4M
private final static String RESERVED_SYSTEM_MEMORY_BYTES = "384m";
private final static String MEMORY_PAGE_SIZE = "1m";
private final static String SPILLS_FILE_BUFFER_SIZE = "2K";
private final static String DATANODE_SORTED_TEMP_DIR = "datanode";
private int backSocketSoSndbuf = 1024 * 1024;
private int frontSocketNoDelay = 1; // 0=false
private int backSocketNoDelay = 1; // 1=true
public static final int DEFAULT_POOL_SIZE = 128;// 保持后端数据通道的默认最大值
public static final long DEFAULT_IDLE_TIMEOUT = 30 * 60 * 1000L;
private static final long DEFAULT_PROCESSOR_CHECK_PERIOD = 1 * 1000L;
private static final long DEFAULT_DATANODE_IDLE_CHECK_PERIOD = 5 * 60 * 1000L;
private static final long DEFAULT_DATANODE_HEARTBEAT_PERIOD = 10 * 1000L;
@@ -73,6 +75,22 @@ public final class SystemConfig {
private static final int DEFAULT_PARSER_COMMENT_VERSION = 50148;
private static final int DEFAULT_SQL_RECORD_COUNT = 10;
private static final boolean DEFAULT_USE_ZK_SWITCH = true;
private static final String DEFAULT_TRANSACTION_BASE_DIR = "txlogs";
private static final String DEFAULT_TRANSACTION_BASE_NAME = "mycat-tx";
private static final int DEFAULT_TRANSACTION_ROTATE_SIZE = 16;
private final static long CHECKTABLECONSISTENCYPERIOD = 1 * 60 * 1000;
// 全局表一致性检测任务默认24小时调度一次
private static final long DEFAULT_GLOBAL_TABLE_CHECK_PERIOD = 24 * 60 * 60 * 1000L;
private int processorBufferPoolType = 0;
private int processorBufferLocalPercent;
private int frontSocketSoRcvbuf = 1024 * 1024;
private int frontSocketSoSndbuf = 4 * 1024 * 1024;
// mysql 5.6 net_buffer_length defaut 4M
private int backSocketSoRcvbuf = 4 * 1024 * 1024;
private int backSocketSoSndbuf = 1024 * 1024;
private int frontSocketNoDelay = 1; // 0=false
private int backSocketNoDelay = 1; // 1=true
private int maxStringLiteralLength = 65535;
private int frontWriteQueueSize = 2048;
private String bindIp = "0.0.0.0";
@@ -99,16 +117,13 @@ public final class SystemConfig {
private int txIsolation;
private int parserCommentVersion;
private int sqlRecordCount;
private boolean recordTxn = false;
// a page size
private int bufferPoolPageSize;
//minimum allocation unit
private short bufferPoolChunkSize;
// buffer pool page number
private short bufferPoolPageNumber;
//大结果集阈值默认512kb
private int maxResultSet=512*1024;
//大结果集拒绝策略次数过滤限制,默认10次
@@ -122,29 +137,13 @@ public final class SystemConfig {
private int flowControlRejectStrategy=0;
//清理大结果集记录周期
private long clearBigSqLResultSetMapMs=10*60*1000;
private int defaultMaxLimit = DEFAULT_MAX_LIMIT;
public static final int SEQUENCEHANDLER_LOCALFILE = 0;
public static final int SEQUENCEHANDLER_MYSQLDB = 1;
public static final int SEQUENCEHANDLER_LOCAL_TIME = 2;
public static final int SEQUENCEHANDLER_ZK_DISTRIBUTED = 3;
public static final int SEQUENCEHANDLER_ZK_GLOBAL_INCREMENT = 4;
/*
* 注意!!! 目前mycat支持的MySQL版本如果后续有新的MySQL版本,请添加到此数组, 对于MySQL的其他分支
* 比如MariaDB目前版本号已经到10.1.x但是其驱动程序仍然兼容官方的MySQL,因此这里版本号只需要MySQL官方的版本号即可。
*/
public static final String[] MySQLVersions = { "5.5", "5.6", "5.7" };
private int sequnceHandlerType = SEQUENCEHANDLER_LOCALFILE;
private String sqlInterceptor = "io.mycat.server.interceptor.impl.DefaultSqlInterceptor";
private String sqlInterceptorType = "select";
private String sqlInterceptorFile = System.getProperty("user.dir")+"/logs/sql.txt";
public static final int MUTINODELIMIT_SMALL_DATA = 0;
public static final int MUTINODELIMIT_LAR_DATA = 1;
private int mutiNodeLimitType = MUTINODELIMIT_SMALL_DATA;
public static final int MUTINODELIMIT_PATCH_SIZE = 100;
private int mutiNodePatchSize = MUTINODELIMIT_PATCH_SIZE;
private String defaultSqlParser = DEFAULT_SQL_PARSER;
private int usingAIO = 0;
private int packetHeaderSize = 4;
@@ -156,46 +155,30 @@ public final class SystemConfig {
// 是否使用HandshakeV10Packet来与client进行通讯, 1:是 , 0:否(使用HandshakePacket)
// 使用HandshakeV10Packet为的是兼容高版本的jdbc驱动, 后期稳定下来考虑全部采用HandshakeV10Packet来通讯
private int useHandshakeV10 = 0;
private int checkTableConsistency = 0;
private long checkTableConsistencyPeriod = CHECKTABLECONSISTENCYPERIOD;
private final static long CHECKTABLECONSISTENCYPERIOD = 1 * 60 * 1000;
private int processorBufferPoolType = 0;
// 全局表一致性检测任务默认24小时调度一次
private static final long DEFAULT_GLOBAL_TABLE_CHECK_PERIOD = 24 * 60 * 60 * 1000L;
private int useGlobleTableCheck = 1; // 全局表一致性检查开关
private long glableTableCheckPeriod;
/**
* Mycat 使用 Off Heap For Merge/Order/Group/Limit计算相关参数
*/
/**
* 是否启用Off Heap for Merge 1-启用0-不启用
*/
private int useOffHeapForMerge;
/**
*页大小,对应MemoryBlock的大小单位为M
*/
private String memoryPageSize;
/**
* DiskRowWriter写磁盘是临时写Buffer单位为K
*/
private String spillsFileBufferSize;
/**
* 启用结果集流输出不经过merge模块,
*/
private int useStreamOutput;
/**
* 该变量仅在Merge使用On Heap
* 内存方式时起作用如果使用Off Heap内存方式
@@ -206,30 +189,21 @@ public final class SystemConfig {
* 连接操作。
*/
private String systemReserveMemorySize;
private String XARecoveryLogBaseDir;
private String XARecoveryLogBaseName;
private String transactionLogBaseDir;
private String transactionLogBaseName;
private int transactionRatateSize;
/**
* 排序时,内存不够时,将已经排序的结果集
* 写入到临时目录
*/
private String dataNodeSortedTempDir;
/**
* 是否启用zk切换
*/
private boolean useZKSwitch=DEFAULT_USE_ZK_SWITCH;
public String getDefaultSqlParser() {
return defaultSqlParser;
}
public void setDefaultSqlParser(String defaultSqlParser) {
this.defaultSqlParser = defaultSqlParser;
}
public SystemConfig() {
this.serverPort = DEFAULT_PORT;
this.managerPort = DEFAULT_MANAGER_PORT;
@@ -269,8 +243,42 @@ public final class SystemConfig {
this.dataNodeSortedTempDir = System.getProperty("user.dir");
this.XARecoveryLogBaseDir = SystemConfig.getHomePath()+"/tmlogs/";
this.XARecoveryLogBaseName ="tmlog";
this.transactionLogBaseDir = SystemConfig.getHomePath()+File.separatorChar+DEFAULT_TRANSACTION_BASE_DIR;
this.transactionLogBaseName = DEFAULT_TRANSACTION_BASE_NAME;
this.transactionRatateSize = DEFAULT_TRANSACTION_ROTATE_SIZE;
}
public int getTransactionRatateSize() {
return transactionRatateSize;
}
public void setTransactionRatateSize(int transactionRatateSize) {
this.transactionRatateSize = transactionRatateSize;
}
public String getDefaultSqlParser() {
return defaultSqlParser;
}
public void setDefaultSqlParser(String defaultSqlParser) {
this.defaultSqlParser = defaultSqlParser;
}
public String getTransactionLogBaseDir() {
return transactionLogBaseDir;
}
public void setTransactionLogBaseDir(String transactionLogBaseDir) {
this.transactionLogBaseDir = transactionLogBaseDir;
}
public String getTransactionLogBaseName() {
return transactionLogBaseName;
}
public void setTransactionLogBaseName(String transactionLogBaseName) {
this.transactionLogBaseName = transactionLogBaseName;
}
public String getDataNodeSortedTempDir() {
return dataNodeSortedTempDir;
}
@@ -658,6 +666,13 @@ public final class SystemConfig {
this.sqlRecordCount = sqlRecordCount;
}
public boolean isRecordTxn(){
return recordTxn;
}
public void setRecordTxn(boolean recordTxn){
this.recordTxn = recordTxn;
}
public short getBufferPoolChunkSize() {
return bufferPoolChunkSize;

View File

@@ -0,0 +1,166 @@
package io.mycat.log;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class DailyRotateLogStore {
private static final String FILE_SEPARATOR = String.valueOf(File.separatorChar);
private String prefix;
private String suffix;
private String fileName;
private final String mode;
private long maxFileSize;
private int currentIndex;
private long nextCheckTime;
private Calendar cal;
private Date now;
private SimpleDateFormat dateFormat;
private String dateString;
private long pos;
private FileChannel channel;
/**
*
* @param baseDir
* @param baseName
* @param suffix
* @param rolateSize unit:M
*/
public DailyRotateLogStore(String baseDir, String baseName, String suffix, int rolateSize) {
if (!baseDir.endsWith(FILE_SEPARATOR)) {
baseDir += FILE_SEPARATOR;
}
this.prefix = baseDir+baseName;
this.suffix = suffix;
this.fileName = this.prefix + "." + suffix;
this.mode = "rw";
this.maxFileSize = rolateSize * 1024 * 1024;
this.nextCheckTime = System.currentTimeMillis() - 1;
this.cal = Calendar.getInstance();
this.dateFormat = new SimpleDateFormat("yyyy-MM-dd");
this.dateString = dateFormat.format(new Date());
}
public void open() throws IOException {
File f = new File(fileName);
createDirectorys(f.getAbsoluteFile().getParentFile().getAbsolutePath());
if (now == null) {
if (f.exists() && System.currentTimeMillis() > f.lastModified()) {
now = new Date(f.lastModified());
} else {
now = new Date();
}
}
nextCheckTime = calculateNextCheckTime(now);
dateString = dateFormat.format(now);
if (f.exists()) {
indexRollOver();
} else {
channel = new RandomAccessFile(fileName, mode).getChannel();
}
}
public void close() {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
}
}
}
public void createDirectorys(String dir) {
if (dir != null) {
File f = new File(dir);
if (!f.exists()) {
String parent = f.getParent();
createDirectorys(parent);
f.mkdir();
}
}
}
public void write(ByteBuffer buffer) throws IOException {
long n = System.currentTimeMillis();
if (n >= nextCheckTime) {
now.setTime(n);
nextCheckTime = calculateNextCheckTime(now);
dateRollOver();
}
do {
if (maxFileSize > pos + buffer.remaining()) {
pos += channel.write(buffer);
} else if (maxFileSize == pos) {
// create new file
indexRollOver();
pos = 0;
} else {
int length = (int) (maxFileSize - pos);
int limit = buffer.limit();
buffer.limit(length+buffer.position());
pos += channel.write(buffer);
buffer.limit(limit);
}
} while (buffer.hasRemaining());
}
public void force(boolean metaData) throws IOException {
if (channel != null) {
channel.force(metaData);
}
}
private void dateRollOver() throws IOException {
if (new File(fileName).length() > 0)
indexRollOver();
dateString = dateFormat.format(now);
currentIndex = 0;
}
@SuppressWarnings("resource")
private void indexRollOver() throws IOException {
currentIndex++;
File file;
File target;
while (true) {
target = new File(buildRollFileName(currentIndex));
if (target.exists()) {
currentIndex++;
} else {
break;
}
}
force(false);
close();
file = new File(fileName);
file.renameTo(target);
channel = new RandomAccessFile(fileName, mode).getChannel();
}
private String buildRollFileName(int index) {
StringBuffer buf = new StringBuffer(prefix);
buf.append('_');
buf.append(dateString);
buf.append('.');
buf.append(index);
buf.append('.');
buf.append(suffix);
return buf.toString();
}
private long calculateNextCheckTime(Date now) {
cal.setTime(now);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
cal.add(Calendar.DATE, 1);
return cal.getTimeInMillis();
}
}

View File

@@ -0,0 +1,76 @@
package io.mycat.log.transaction;
public class TxnBinaryLog {
private String user;
private String host;
private String schema;
private long xid;
private String executeTime;
private String query;
private long connid;
public long getConnid() {
return connid;
}
public void setConnid(long connid) {
this.connid = connid;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public long getXid() {
return xid;
}
public void setXid(long xid) {
this.xid = xid;
}
public String getExecuteTime() {
return executeTime;
}
public void setExecuteTime(String executeTime) {
this.executeTime = executeTime;
}
public String getQuery() {
return query;
}
public void setQuery(String query) {
this.query = query;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(executeTime).append(", ConnID:").append(connid).append(", XID:").append(xid).append(", ").append("MySQL user ").append("'").append(user)
.append("'").append("@'").append(host).append("', ").append(" Current schema `").append(schema)
.append("`, ").append("Current query ").append("\n").append(query).append("\n");
return builder.toString();
}
}

View File

@@ -0,0 +1,12 @@
package io.mycat.log.transaction;
import io.mycat.MycatServer;
import io.mycat.server.ServerConnection;
public class TxnLogHelper {
public static void putTxnLog(ServerConnection c, String sql){
if (MycatServer.getInstance().getConfig().getSystem().isRecordTxn()) {
MycatServer.getInstance().getTxnLogProcessor().putTxnLog(c, sql);
}
}
}

View File

@@ -0,0 +1,97 @@
package io.mycat.log.transaction;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import org.apache.log4j.helpers.ISO8601DateFormat;
import io.mycat.MycatServer;
import io.mycat.buffer.BufferPool;
import io.mycat.config.MycatConfig;
import io.mycat.config.model.SystemConfig;
import io.mycat.log.DailyRotateLogStore;
import io.mycat.server.ServerConnection;
import io.mycat.util.TimeUtil;
public class TxnLogProcessor extends Thread {
private static final Logger logger = Logger.getLogger(TxnLogProcessor.class);
private final DateFormat dateFormat;
private BlockingQueue<TxnBinaryLog> queue;
private DailyRotateLogStore store;
public TxnLogProcessor(BufferPool bufferPool) {
this.dateFormat = new ISO8601DateFormat();
this.queue = new LinkedBlockingQueue<TxnBinaryLog>(256);
MycatConfig mycatconfig = MycatServer.getInstance().getConfig();
SystemConfig systemConfig = mycatconfig.getSystem();
this.store = new DailyRotateLogStore(systemConfig.getTransactionLogBaseDir(), systemConfig.getTransactionLogBaseName(),"log",systemConfig.getTransactionRatateSize());
}
@Override
public void run() {
TxnBinaryLog log = null;
long flushTime = TimeUtil.currentTimeMillis();
try {
store.open();
for (;;) {
while ((log = queue.poll()) != null) {
writeLog(log);
}
long interval = TimeUtil.currentTimeMillis() - flushTime;
if (interval > 1000) {
store.force(false);
flushTime = TimeUtil.currentTimeMillis();
}
try {
log = queue.take();
} catch (InterruptedException e) {
}
writeLog(log);
}
} catch (IOException e) {
logger.warn("transaction log error:", e);
store.close();
}
}
private void writeLog(TxnBinaryLog log) throws IOException {
if (log == null)
return;
byte[] data = null;
try {
data = log.toString().getBytes("utf-8");
} catch (UnsupportedEncodingException e) {
return;
}
ByteBuffer buffer = ByteBuffer.wrap(data);
store.write(buffer);
}
public void putTxnLog(ServerConnection c, String sql) {
TxnBinaryLog log = new TxnBinaryLog();
log.setUser(c.getUser());
log.setHost(c.getHost());
log.setSchema(c.getSchema());
Date date = new Date();
date.setTime(System.currentTimeMillis());
log.setExecuteTime(dateFormat.format(date));
log.setConnid(c.getId());
if (c.isTxstart() || !c.isAutocommit()) {
log.setXid(c.getXid());
} else {
log.setXid(c.getAndIncrementXid());
}
log.setQuery(sql);
try {
this.queue.put(log);
} catch (InterruptedException e) {
// ignore
}
}
}

View File

@@ -25,6 +25,7 @@ package io.mycat.server;
import java.io.IOException;
import java.nio.channels.NetworkChannel;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,12 +64,20 @@ public class ServerConnection extends FrontendConnection {
* 标志是否执行了lock tables语句并处于lock状态
*/
private volatile boolean isLocked = false;
private AtomicLong xid;
public long getAndIncrementXid(){
return xid.getAndIncrement();
}
public long getXid(){
return xid.get();
}
public ServerConnection(NetworkChannel channel)
throws IOException {
super(channel);
this.txInterrupted = false;
this.autocommit = true;
this.xid = new AtomicLong(1);
}
@Override

View File

@@ -71,7 +71,6 @@ public class ServerQueryHandler implements FrontendQueryHandler {
//
int rs = ServerParse.parse(sql);
int sqlType = rs & 0xff;
switch (sqlType) {
//explain sql
case ServerParse.EXPLAIN:
@@ -112,10 +111,10 @@ public class ServerQueryHandler implements FrontendQueryHandler {
UseHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.COMMIT:
CommitHandler.handle(c);
CommitHandler.handle(sql, c);
break;
case ServerParse.ROLLBACK:
RollBackHandler.handle(c);
RollBackHandler.handle(sql, c);
break;
case ServerParse.HELP:
LOGGER.warn(new StringBuilder().append("Unsupported command:").append(sql).toString());

View File

@@ -23,6 +23,7 @@
*/
package io.mycat.server.handler;
import io.mycat.log.transaction.TxnLogHelper;
import io.mycat.net.mysql.OkPacket;
import io.mycat.server.ServerConnection;
@@ -31,10 +32,13 @@ public final class BeginHandler {
if (c.isTxstart() || !c.isAutocommit()) {
c.setTxstart(true);
c.commit();
TxnLogHelper.putTxnLog(c, "commit[because of begin]");
c.getAndIncrementXid();
} else {
c.setTxstart(true);
c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));
}
TxnLogHelper.putTxnLog(c, stmt);
}
}

View File

@@ -1,10 +1,13 @@
package io.mycat.server.handler;
import io.mycat.log.transaction.TxnLogHelper;
import io.mycat.server.ServerConnection;
public final class CommitHandler {
public static void handle(ServerConnection c) {
c.commit();
c.setTxstart(false);
}
public static void handle(String stmt, ServerConnection c) {
c.commit();
TxnLogHelper.putTxnLog(c, stmt);
c.setTxstart(false);
c.getAndIncrementXid();
}
}

View File

@@ -1,10 +1,12 @@
package io.mycat.server.handler;
import io.mycat.log.transaction.TxnLogHelper;
import io.mycat.server.ServerConnection;
public final class RollBackHandler {
public static void handle( ServerConnection c) {
public static void handle(String stmt, ServerConnection c) {
c.rollback();
c.setTxstart(false);
TxnLogHelper.putTxnLog(c, stmt);
}
}

View File

@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import io.mycat.config.ErrorCode;
import io.mycat.config.Isolations;
import io.mycat.log.transaction.TxnLogHelper;
import io.mycat.net.mysql.OkPacket;
import io.mycat.server.ServerConnection;
import io.mycat.server.parser.ServerParseSet;
@@ -68,13 +69,16 @@ public final class SetHandler {
c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));
} else {
c.commit();
TxnLogHelper.putTxnLog(c, "commit[because of "+stmt+"]");
c.setTxstart(false);
c.getAndIncrementXid();
c.setAutocommit(true);
}
break;
case AUTOCOMMIT_OFF: {
if (c.isAutocommit()) {
c.setAutocommit(false);
TxnLogHelper.putTxnLog(c, stmt);
}
c.write(c.writeToBuffer(AC_OFF, c.allocate()));
break;

View File

@@ -23,6 +23,7 @@
*/
package io.mycat.server.handler;
import io.mycat.log.transaction.TxnLogHelper;
import io.mycat.net.mysql.OkPacket;
import io.mycat.server.ServerConnection;
import io.mycat.server.parser.ServerParse;
@@ -42,6 +43,7 @@ public final class StartHandler {
c.setTxstart(true);
c.writeToBuffer(OkPacket.OK, c.allocate());
}
TxnLogHelper.putTxnLog(c, stmt);
break;
default:
c.execute(stmt, ServerParse.START);