From 4dc558043b661e88b81fb80fee3e94a030bdcf69 Mon Sep 17 00:00:00 2001 From: yanhuqing666 Date: Wed, 30 Nov 2016 17:11:06 +0800 Subject: [PATCH] transaction log --- .gitignore | 1 + src/main/java/io/mycat/MycatServer.java | 15 +- .../nio/handler/MultiNodeQueryHandler.java | 8 + .../mysql/nio/handler/SingleNodeHandler.java | 7 +- .../io/mycat/config/model/DataHostConfig.java | 3 +- .../io/mycat/config/model/SystemConfig.java | 137 ++++++++------- .../io/mycat/log/DailyRotateLogStore.java | 166 ++++++++++++++++++ .../mycat/log/transaction/TxnBinaryLog.java | 76 ++++++++ .../mycat/log/transaction/TxnLogHelper.java | 12 ++ .../log/transaction/TxnLogProcessor.java | 97 ++++++++++ .../io/mycat/server/ServerConnection.java | 9 + .../io/mycat/server/ServerQueryHandler.java | 5 +- .../io/mycat/server/handler/BeginHandler.java | 4 + .../mycat/server/handler/CommitHandler.java | 11 +- .../mycat/server/handler/RollBackHandler.java | 4 +- .../io/mycat/server/handler/SetHandler.java | 4 + .../io/mycat/server/handler/StartHandler.java | 2 + 17 files changed, 489 insertions(+), 72 deletions(-) create mode 100644 src/main/java/io/mycat/log/DailyRotateLogStore.java create mode 100644 src/main/java/io/mycat/log/transaction/TxnBinaryLog.java create mode 100644 src/main/java/io/mycat/log/transaction/TxnLogHelper.java create mode 100644 src/main/java/io/mycat/log/transaction/TxnLogProcessor.java diff --git a/.gitignore b/.gitignore index 0d6dce9f5..b2cb9c6e3 100644 --- a/.gitignore +++ b/.gitignore @@ -112,6 +112,7 @@ hs_err_pid* .DS_Store /target/ /tmlogs/ +/txlog/ src/main/resources/server.xml src/main/resources/schema.xml src/main/resources/rule.xml diff --git a/src/main/java/io/mycat/MycatServer.java b/src/main/java/io/mycat/MycatServer.java index 43e7df360..05e0975cb 100644 --- a/src/main/java/io/mycat/MycatServer.java +++ b/src/main/java/io/mycat/MycatServer.java @@ -71,6 +71,7 @@ import io.mycat.config.loader.zkprocess.comm.ZkParamCfg; import io.mycat.config.model.SchemaConfig; import io.mycat.config.model.SystemConfig; import io.mycat.config.model.TableConfig; +import io.mycat.log.transaction.TxnLogProcessor; import io.mycat.manager.ManagerConnectionFactory; import io.mycat.memory.MyCatMemory; import io.mycat.meta.MySQLTableStructureCheck; @@ -118,6 +119,7 @@ public class MycatServer { private final CacheService cacheService; private Properties dnIndexProperties; private ProxyMetaManager tmManager; + private TxnLogProcessor txnLogProcessor; //AIO连接群组 private AsynchronousChannelGroup[] asyncChannelGroups; @@ -168,7 +170,7 @@ public class MycatServer { //SQL记录器 this.sqlRecorder = new SQLRecorder(config.getSystem().getSqlRecordCount()); - + /** * 是否在线,MyCat manager中有命令控制 * | offline | Change MyCat status to OFF | @@ -411,6 +413,13 @@ public class MycatServer { server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Server", system.getBindIp(), system.getServerPort(), sf, reactorPool); } + + // start transaction SQL log + if (config.getSystem().isRecordTxn()) { + txnLogProcessor = new TxnLogProcessor(bufferPool); + txnLogProcessor.setName("TxnLogProcessor"); + txnLogProcessor.start(); + } tmManager = new ProxyMetaManager(); tmManager.init(); @@ -684,6 +693,10 @@ public class MycatServer { return (isUseZkSwitch&&"true".equalsIgnoreCase(loadZk)) ; } + public TxnLogProcessor getTxnLogProcessor() { + return txnLogProcessor; + } + public RouteService getRouterService() { return routerService; } diff --git a/src/main/java/io/mycat/backend/mysql/nio/handler/MultiNodeQueryHandler.java b/src/main/java/io/mycat/backend/mysql/nio/handler/MultiNodeQueryHandler.java index b6a7ed28c..cd4b5a0c0 100644 --- a/src/main/java/io/mycat/backend/mysql/nio/handler/MultiNodeQueryHandler.java +++ b/src/main/java/io/mycat/backend/mysql/nio/handler/MultiNodeQueryHandler.java @@ -45,6 +45,7 @@ import io.mycat.backend.mysql.nio.handler.TransactionHandler.TxOperation; import io.mycat.cache.LayerCachePool; import io.mycat.config.ErrorCode; import io.mycat.config.MycatConfig; +import io.mycat.log.transaction.TxnLogHelper; import io.mycat.memory.unsafe.row.UnsafeRow; import io.mycat.net.mysql.BinaryRowDataPacket; import io.mycat.net.mysql.ErrorPacket; @@ -168,7 +169,11 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR MycatConfig conf = MycatServer.getInstance().getConfig(); startTime = System.currentTimeMillis(); LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave()); + StringBuilder sb = new StringBuilder(); for (final RouteResultsetNode node : rrs.getNodes()) { + if(!autocommit||session.getSource().isTxstart()||node.isModifySQL()){ + sb.append("["+node.getName()+"]"+node.getStatement()).append(";\n"); + } BackendConnection conn = session.getTarget(node); if (session.tryExistsCon(conn, node)) { LOGGER.debug("node.getRunOnSlave()-" + node.getRunOnSlave()); @@ -190,6 +195,9 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR } } + if(sb.length()>0){ + TxnLogHelper.putTxnLog(session.getSource(), sb.toString()); + } } private void _execute(BackendConnection conn, RouteResultsetNode node) { diff --git a/src/main/java/io/mycat/backend/mysql/nio/handler/SingleNodeHandler.java b/src/main/java/io/mycat/backend/mysql/nio/handler/SingleNodeHandler.java index 0732d5060..689cdcaad 100644 --- a/src/main/java/io/mycat/backend/mysql/nio/handler/SingleNodeHandler.java +++ b/src/main/java/io/mycat/backend/mysql/nio/handler/SingleNodeHandler.java @@ -40,6 +40,7 @@ import io.mycat.backend.mysql.LoadDataUtil; import io.mycat.config.ErrorCode; import io.mycat.config.MycatConfig; import io.mycat.config.model.SchemaConfig; +import io.mycat.log.transaction.TxnLogHelper; import io.mycat.net.mysql.BinaryRowDataPacket; import io.mycat.net.mysql.ErrorPacket; import io.mycat.net.mysql.FieldPacket; @@ -153,7 +154,11 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl } conn.setResponseHandler(this); try { - conn.execute(node, session.getSource(), session.getSource().isAutocommit()&&!session.getSource().isTxstart()); + boolean isAutocommit = session.getSource().isAutocommit()&&!session.getSource().isTxstart(); + if(!isAutocommit){ + TxnLogHelper.putTxnLog(session.getSource(), node.getStatement()); + } + conn.execute(node, session.getSource(), isAutocommit); } catch (Exception e1) { executeException(conn, e1); return; diff --git a/src/main/java/io/mycat/config/model/DataHostConfig.java b/src/main/java/io/mycat/config/model/DataHostConfig.java index 6a014d010..0075b885e 100644 --- a/src/main/java/io/mycat/config/model/DataHostConfig.java +++ b/src/main/java/io/mycat/config/model/DataHostConfig.java @@ -30,6 +30,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.collect.Iterables; + import io.mycat.backend.datasource.PhysicalDBPool; /** @@ -46,7 +47,7 @@ public class DataHostConfig { private static final Pattern pattern = Pattern.compile("\\s*show\\s+slave\\s+status\\s*",Pattern.CASE_INSENSITIVE); private static final Pattern patternCluster = Pattern.compile("\\s*show\\s+status\\s+like\\s+'wsrep%'",Pattern.CASE_INSENSITIVE); private String name; - private int maxCon = SystemConfig.DEFAULT_POOL_SIZE; + private int maxCon = 128;// 保持后端数据通道的默认最大值 private int minCon = 10; private int balance = PhysicalDBPool.BALANCE_NONE; private int writeType = PhysicalDBPool.WRITE_ONLYONE_NODE; diff --git a/src/main/java/io/mycat/config/model/SystemConfig.java b/src/main/java/io/mycat/config/model/SystemConfig.java index ce2ced3d4..ab4000bd5 100644 --- a/src/main/java/io/mycat/config/model/SystemConfig.java +++ b/src/main/java/io/mycat/config/model/SystemConfig.java @@ -36,31 +36,33 @@ import io.mycat.config.Isolations; public final class SystemConfig { public static final String SYS_HOME = "MYCAT_HOME"; + public static final long DEFAULT_IDLE_TIMEOUT = 30 * 60 * 1000L; + public static final int SEQUENCEHANDLER_LOCALFILE = 0; + public static final int SEQUENCEHANDLER_MYSQLDB = 1; + public static final int SEQUENCEHANDLER_LOCAL_TIME = 2; + public static final int SEQUENCEHANDLER_ZK_DISTRIBUTED = 3; + public static final int SEQUENCEHANDLER_ZK_GLOBAL_INCREMENT = 4; + /* + * 注意!!! 目前mycat支持的MySQL版本,如果后续有新的MySQL版本,请添加到此数组, 对于MySQL的其他分支, + * 比如MariaDB目前版本号已经到10.1.x,但是其驱动程序仍然兼容官方的MySQL,因此这里版本号只需要MySQL官方的版本号即可。 + */ + public static final String[] MySQLVersions = { "5.5", "5.6", "5.7" }; + public static final int MUTINODELIMIT_PATCH_SIZE = 100; + public static final int MUTINODELIMIT_SMALL_DATA = 0; + public static final int MUTINODELIMIT_LAR_DATA = 1; + private static final int DEFAULT_PORT = 8066; private static final int DEFAULT_MANAGER_PORT = 9066; private static final String DEFAULT_CHARSET = "utf8"; - private static final String DEFAULT_SQL_PARSER = "druidparser";// fdbparser, druidparser private static final short DEFAULT_BUFFER_CHUNK_SIZE = 4096; private static final int DEFAULT_BUFFER_POOL_PAGE_SIZE = 512*1024*4; private static final short DEFAULT_BUFFER_POOL_PAGE_NUMBER = 64; - private int processorBufferLocalPercent; private static final int DEFAULT_PROCESSORS = Runtime.getRuntime().availableProcessors(); - private int frontSocketSoRcvbuf = 1024 * 1024; - private int frontSocketSoSndbuf = 4 * 1024 * 1024; - private int backSocketSoRcvbuf = 4 * 1024 * 1024;// mysql 5.6 - // net_buffer_length - // defaut 4M - private final static String RESERVED_SYSTEM_MEMORY_BYTES = "384m"; private final static String MEMORY_PAGE_SIZE = "1m"; private final static String SPILLS_FILE_BUFFER_SIZE = "2K"; private final static String DATANODE_SORTED_TEMP_DIR = "datanode"; - private int backSocketSoSndbuf = 1024 * 1024; - private int frontSocketNoDelay = 1; // 0=false - private int backSocketNoDelay = 1; // 1=true - public static final int DEFAULT_POOL_SIZE = 128;// 保持后端数据通道的默认最大值 - public static final long DEFAULT_IDLE_TIMEOUT = 30 * 60 * 1000L; private static final long DEFAULT_PROCESSOR_CHECK_PERIOD = 1 * 1000L; private static final long DEFAULT_DATANODE_IDLE_CHECK_PERIOD = 5 * 60 * 1000L; private static final long DEFAULT_DATANODE_HEARTBEAT_PERIOD = 10 * 1000L; @@ -73,6 +75,22 @@ public final class SystemConfig { private static final int DEFAULT_PARSER_COMMENT_VERSION = 50148; private static final int DEFAULT_SQL_RECORD_COUNT = 10; private static final boolean DEFAULT_USE_ZK_SWITCH = true; + private static final String DEFAULT_TRANSACTION_BASE_DIR = "txlogs"; + private static final String DEFAULT_TRANSACTION_BASE_NAME = "mycat-tx"; + private static final int DEFAULT_TRANSACTION_ROTATE_SIZE = 16; + private final static long CHECKTABLECONSISTENCYPERIOD = 1 * 60 * 1000; + // 全局表一致性检测任务,默认24小时调度一次 + private static final long DEFAULT_GLOBAL_TABLE_CHECK_PERIOD = 24 * 60 * 60 * 1000L; + + private int processorBufferPoolType = 0; + private int processorBufferLocalPercent; + private int frontSocketSoRcvbuf = 1024 * 1024; + private int frontSocketSoSndbuf = 4 * 1024 * 1024; + // mysql 5.6 net_buffer_length defaut 4M + private int backSocketSoRcvbuf = 4 * 1024 * 1024; + private int backSocketSoSndbuf = 1024 * 1024; + private int frontSocketNoDelay = 1; // 0=false + private int backSocketNoDelay = 1; // 1=true private int maxStringLiteralLength = 65535; private int frontWriteQueueSize = 2048; private String bindIp = "0.0.0.0"; @@ -99,16 +117,13 @@ public final class SystemConfig { private int txIsolation; private int parserCommentVersion; private int sqlRecordCount; - + private boolean recordTxn = false; // a page size private int bufferPoolPageSize; - //minimum allocation unit private short bufferPoolChunkSize; - // buffer pool page number private short bufferPoolPageNumber; - //大结果集阈值,默认512kb private int maxResultSet=512*1024; //大结果集拒绝策略次数过滤限制,默认10次 @@ -122,29 +137,13 @@ public final class SystemConfig { private int flowControlRejectStrategy=0; //清理大结果集记录周期 private long clearBigSqLResultSetMapMs=10*60*1000; - private int defaultMaxLimit = DEFAULT_MAX_LIMIT; - public static final int SEQUENCEHANDLER_LOCALFILE = 0; - public static final int SEQUENCEHANDLER_MYSQLDB = 1; - public static final int SEQUENCEHANDLER_LOCAL_TIME = 2; - public static final int SEQUENCEHANDLER_ZK_DISTRIBUTED = 3; - public static final int SEQUENCEHANDLER_ZK_GLOBAL_INCREMENT = 4; - /* - * 注意!!! 目前mycat支持的MySQL版本,如果后续有新的MySQL版本,请添加到此数组, 对于MySQL的其他分支, - * 比如MariaDB目前版本号已经到10.1.x,但是其驱动程序仍然兼容官方的MySQL,因此这里版本号只需要MySQL官方的版本号即可。 - */ - public static final String[] MySQLVersions = { "5.5", "5.6", "5.7" }; private int sequnceHandlerType = SEQUENCEHANDLER_LOCALFILE; private String sqlInterceptor = "io.mycat.server.interceptor.impl.DefaultSqlInterceptor"; private String sqlInterceptorType = "select"; private String sqlInterceptorFile = System.getProperty("user.dir")+"/logs/sql.txt"; - public static final int MUTINODELIMIT_SMALL_DATA = 0; - public static final int MUTINODELIMIT_LAR_DATA = 1; private int mutiNodeLimitType = MUTINODELIMIT_SMALL_DATA; - - public static final int MUTINODELIMIT_PATCH_SIZE = 100; private int mutiNodePatchSize = MUTINODELIMIT_PATCH_SIZE; - private String defaultSqlParser = DEFAULT_SQL_PARSER; private int usingAIO = 0; private int packetHeaderSize = 4; @@ -156,46 +155,30 @@ public final class SystemConfig { // 是否使用HandshakeV10Packet来与client进行通讯, 1:是 , 0:否(使用HandshakePacket) // 使用HandshakeV10Packet为的是兼容高版本的jdbc驱动, 后期稳定下来考虑全部采用HandshakeV10Packet来通讯 private int useHandshakeV10 = 0; - - private int checkTableConsistency = 0; private long checkTableConsistencyPeriod = CHECKTABLECONSISTENCYPERIOD; - private final static long CHECKTABLECONSISTENCYPERIOD = 1 * 60 * 1000; - - private int processorBufferPoolType = 0; - - // 全局表一致性检测任务,默认24小时调度一次 - private static final long DEFAULT_GLOBAL_TABLE_CHECK_PERIOD = 24 * 60 * 60 * 1000L; private int useGlobleTableCheck = 1; // 全局表一致性检查开关 - private long glableTableCheckPeriod; - /** * Mycat 使用 Off Heap For Merge/Order/Group/Limit计算相关参数 */ - /** * 是否启用Off Heap for Merge 1-启用,0-不启用 */ private int useOffHeapForMerge; - /** *页大小,对应MemoryBlock的大小,单位为M */ private String memoryPageSize; - - /** * DiskRowWriter写磁盘是临时写Buffer,单位为K */ private String spillsFileBufferSize; - /** * 启用结果集流输出,不经过merge模块, */ private int useStreamOutput; - /** * 该变量仅在Merge使用On Heap * 内存方式时起作用,如果使用Off Heap内存方式 @@ -206,30 +189,21 @@ public final class SystemConfig { * 连接操作。 */ private String systemReserveMemorySize; - private String XARecoveryLogBaseDir; - private String XARecoveryLogBaseName; - + private String transactionLogBaseDir; + private String transactionLogBaseName; + private int transactionRatateSize; /** * 排序时,内存不够时,将已经排序的结果集 * 写入到临时目录 */ private String dataNodeSortedTempDir; - /** * 是否启用zk切换 */ private boolean useZKSwitch=DEFAULT_USE_ZK_SWITCH; - public String getDefaultSqlParser() { - return defaultSqlParser; - } - - public void setDefaultSqlParser(String defaultSqlParser) { - this.defaultSqlParser = defaultSqlParser; - } - public SystemConfig() { this.serverPort = DEFAULT_PORT; this.managerPort = DEFAULT_MANAGER_PORT; @@ -269,8 +243,42 @@ public final class SystemConfig { this.dataNodeSortedTempDir = System.getProperty("user.dir"); this.XARecoveryLogBaseDir = SystemConfig.getHomePath()+"/tmlogs/"; this.XARecoveryLogBaseName ="tmlog"; + this.transactionLogBaseDir = SystemConfig.getHomePath()+File.separatorChar+DEFAULT_TRANSACTION_BASE_DIR; + this.transactionLogBaseName = DEFAULT_TRANSACTION_BASE_NAME; + this.transactionRatateSize = DEFAULT_TRANSACTION_ROTATE_SIZE; } + public int getTransactionRatateSize() { + return transactionRatateSize; + } + + public void setTransactionRatateSize(int transactionRatateSize) { + this.transactionRatateSize = transactionRatateSize; + } + + public String getDefaultSqlParser() { + return defaultSqlParser; + } + + public void setDefaultSqlParser(String defaultSqlParser) { + this.defaultSqlParser = defaultSqlParser; + } + + public String getTransactionLogBaseDir() { + return transactionLogBaseDir; + } + + public void setTransactionLogBaseDir(String transactionLogBaseDir) { + this.transactionLogBaseDir = transactionLogBaseDir; + } + + public String getTransactionLogBaseName() { + return transactionLogBaseName; + } + + public void setTransactionLogBaseName(String transactionLogBaseName) { + this.transactionLogBaseName = transactionLogBaseName; + } public String getDataNodeSortedTempDir() { return dataNodeSortedTempDir; } @@ -658,6 +666,13 @@ public final class SystemConfig { this.sqlRecordCount = sqlRecordCount; } + public boolean isRecordTxn(){ + return recordTxn; + } + + public void setRecordTxn(boolean recordTxn){ + this.recordTxn = recordTxn; + } public short getBufferPoolChunkSize() { return bufferPoolChunkSize; diff --git a/src/main/java/io/mycat/log/DailyRotateLogStore.java b/src/main/java/io/mycat/log/DailyRotateLogStore.java new file mode 100644 index 000000000..999375cd2 --- /dev/null +++ b/src/main/java/io/mycat/log/DailyRotateLogStore.java @@ -0,0 +1,166 @@ +package io.mycat.log; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; + +public class DailyRotateLogStore { + private static final String FILE_SEPARATOR = String.valueOf(File.separatorChar); + private String prefix; + private String suffix; + private String fileName; + private final String mode; + private long maxFileSize; + private int currentIndex; + private long nextCheckTime; + private Calendar cal; + private Date now; + private SimpleDateFormat dateFormat; + private String dateString; + private long pos; + private FileChannel channel; + /** + * + * @param baseDir + * @param baseName + * @param suffix + * @param rolateSize unit:M + */ + public DailyRotateLogStore(String baseDir, String baseName, String suffix, int rolateSize) { + if (!baseDir.endsWith(FILE_SEPARATOR)) { + baseDir += FILE_SEPARATOR; + } + this.prefix = baseDir+baseName; + this.suffix = suffix; + this.fileName = this.prefix + "." + suffix; + this.mode = "rw"; + this.maxFileSize = rolateSize * 1024 * 1024; + this.nextCheckTime = System.currentTimeMillis() - 1; + this.cal = Calendar.getInstance(); + this.dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + this.dateString = dateFormat.format(new Date()); + } + + public void open() throws IOException { + File f = new File(fileName); + createDirectorys(f.getAbsoluteFile().getParentFile().getAbsolutePath()); + if (now == null) { + if (f.exists() && System.currentTimeMillis() > f.lastModified()) { + now = new Date(f.lastModified()); + } else { + now = new Date(); + } + } + nextCheckTime = calculateNextCheckTime(now); + dateString = dateFormat.format(now); + if (f.exists()) { + indexRollOver(); + } else { + channel = new RandomAccessFile(fileName, mode).getChannel(); + } + } + + public void close() { + if (channel != null) { + try { + channel.close(); + } catch (IOException e) { + } + } + } + + public void createDirectorys(String dir) { + if (dir != null) { + File f = new File(dir); + if (!f.exists()) { + String parent = f.getParent(); + createDirectorys(parent); + f.mkdir(); + } + } + } + + public void write(ByteBuffer buffer) throws IOException { + long n = System.currentTimeMillis(); + if (n >= nextCheckTime) { + now.setTime(n); + nextCheckTime = calculateNextCheckTime(now); + dateRollOver(); + } + + do { + if (maxFileSize > pos + buffer.remaining()) { + pos += channel.write(buffer); + } else if (maxFileSize == pos) { + // create new file + indexRollOver(); + pos = 0; + } else { + int length = (int) (maxFileSize - pos); + int limit = buffer.limit(); + buffer.limit(length+buffer.position()); + pos += channel.write(buffer); + buffer.limit(limit); + } + } while (buffer.hasRemaining()); + } + + public void force(boolean metaData) throws IOException { + if (channel != null) { + channel.force(metaData); + } + } + + private void dateRollOver() throws IOException { + if (new File(fileName).length() > 0) + indexRollOver(); + dateString = dateFormat.format(now); + currentIndex = 0; + } + + @SuppressWarnings("resource") + private void indexRollOver() throws IOException { + currentIndex++; + File file; + File target; + while (true) { + target = new File(buildRollFileName(currentIndex)); + if (target.exists()) { + currentIndex++; + } else { + break; + } + } + force(false); + close(); + file = new File(fileName); + file.renameTo(target); + channel = new RandomAccessFile(fileName, mode).getChannel(); + } + + private String buildRollFileName(int index) { + StringBuffer buf = new StringBuffer(prefix); + buf.append('_'); + buf.append(dateString); + buf.append('.'); + buf.append(index); + buf.append('.'); + buf.append(suffix); + return buf.toString(); + } + + private long calculateNextCheckTime(Date now) { + cal.setTime(now); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + cal.add(Calendar.DATE, 1); + return cal.getTimeInMillis(); + } +} diff --git a/src/main/java/io/mycat/log/transaction/TxnBinaryLog.java b/src/main/java/io/mycat/log/transaction/TxnBinaryLog.java new file mode 100644 index 000000000..0cf2fc227 --- /dev/null +++ b/src/main/java/io/mycat/log/transaction/TxnBinaryLog.java @@ -0,0 +1,76 @@ +package io.mycat.log.transaction; + +public class TxnBinaryLog { + private String user; + private String host; + private String schema; + private long xid; + private String executeTime; + private String query; + private long connid; + + public long getConnid() { + return connid; + } + + public void setConnid(long connid) { + this.connid = connid; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public long getXid() { + return xid; + } + + public void setXid(long xid) { + this.xid = xid; + } + + public String getExecuteTime() { + return executeTime; + } + + public void setExecuteTime(String executeTime) { + this.executeTime = executeTime; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(executeTime).append(", ConnID:").append(connid).append(", XID:").append(xid).append(", ").append("MySQL user ").append("'").append(user) + .append("'").append("@'").append(host).append("', ").append(" Current schema `").append(schema) + .append("`, ").append("Current query ").append("\n").append(query).append("\n"); + return builder.toString(); + } +} diff --git a/src/main/java/io/mycat/log/transaction/TxnLogHelper.java b/src/main/java/io/mycat/log/transaction/TxnLogHelper.java new file mode 100644 index 000000000..4077435a4 --- /dev/null +++ b/src/main/java/io/mycat/log/transaction/TxnLogHelper.java @@ -0,0 +1,12 @@ +package io.mycat.log.transaction; + +import io.mycat.MycatServer; +import io.mycat.server.ServerConnection; + +public class TxnLogHelper { + public static void putTxnLog(ServerConnection c, String sql){ + if (MycatServer.getInstance().getConfig().getSystem().isRecordTxn()) { + MycatServer.getInstance().getTxnLogProcessor().putTxnLog(c, sql); + } + } +} diff --git a/src/main/java/io/mycat/log/transaction/TxnLogProcessor.java b/src/main/java/io/mycat/log/transaction/TxnLogProcessor.java new file mode 100644 index 000000000..79f85f6f5 --- /dev/null +++ b/src/main/java/io/mycat/log/transaction/TxnLogProcessor.java @@ -0,0 +1,97 @@ +package io.mycat.log.transaction; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.text.DateFormat; +import java.util.Date; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.log4j.Logger; +import org.apache.log4j.helpers.ISO8601DateFormat; + +import io.mycat.MycatServer; +import io.mycat.buffer.BufferPool; +import io.mycat.config.MycatConfig; +import io.mycat.config.model.SystemConfig; +import io.mycat.log.DailyRotateLogStore; +import io.mycat.server.ServerConnection; +import io.mycat.util.TimeUtil; + +public class TxnLogProcessor extends Thread { + private static final Logger logger = Logger.getLogger(TxnLogProcessor.class); + private final DateFormat dateFormat; + private BlockingQueue queue; + private DailyRotateLogStore store; + + public TxnLogProcessor(BufferPool bufferPool) { + this.dateFormat = new ISO8601DateFormat(); + this.queue = new LinkedBlockingQueue(256); + MycatConfig mycatconfig = MycatServer.getInstance().getConfig(); + SystemConfig systemConfig = mycatconfig.getSystem(); + this.store = new DailyRotateLogStore(systemConfig.getTransactionLogBaseDir(), systemConfig.getTransactionLogBaseName(),"log",systemConfig.getTransactionRatateSize()); + } + + @Override + public void run() { + TxnBinaryLog log = null; + long flushTime = TimeUtil.currentTimeMillis(); + try { + store.open(); + for (;;) { + while ((log = queue.poll()) != null) { + writeLog(log); + } + long interval = TimeUtil.currentTimeMillis() - flushTime; + if (interval > 1000) { + store.force(false); + flushTime = TimeUtil.currentTimeMillis(); + } + try { + log = queue.take(); + } catch (InterruptedException e) { + } + writeLog(log); + } + } catch (IOException e) { + logger.warn("transaction log error:", e); + store.close(); + } + } + + private void writeLog(TxnBinaryLog log) throws IOException { + if (log == null) + return; + byte[] data = null; + try { + data = log.toString().getBytes("utf-8"); + } catch (UnsupportedEncodingException e) { + return; + } + ByteBuffer buffer = ByteBuffer.wrap(data); + store.write(buffer); + } + + public void putTxnLog(ServerConnection c, String sql) { + TxnBinaryLog log = new TxnBinaryLog(); + log.setUser(c.getUser()); + log.setHost(c.getHost()); + log.setSchema(c.getSchema()); + Date date = new Date(); + date.setTime(System.currentTimeMillis()); + log.setExecuteTime(dateFormat.format(date)); + log.setConnid(c.getId()); + if (c.isTxstart() || !c.isAutocommit()) { + log.setXid(c.getXid()); + } else { + log.setXid(c.getAndIncrementXid()); + } + log.setQuery(sql); + try { + this.queue.put(log); + } catch (InterruptedException e) { + // ignore + } + } +} diff --git a/src/main/java/io/mycat/server/ServerConnection.java b/src/main/java/io/mycat/server/ServerConnection.java index 01dd817ef..38190b38c 100644 --- a/src/main/java/io/mycat/server/ServerConnection.java +++ b/src/main/java/io/mycat/server/ServerConnection.java @@ -25,6 +25,7 @@ package io.mycat.server; import java.io.IOException; import java.nio.channels.NetworkChannel; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,12 +64,20 @@ public class ServerConnection extends FrontendConnection { * 标志是否执行了lock tables语句,并处于lock状态 */ private volatile boolean isLocked = false; + private AtomicLong xid; + public long getAndIncrementXid(){ + return xid.getAndIncrement(); + } + public long getXid(){ + return xid.get(); + } public ServerConnection(NetworkChannel channel) throws IOException { super(channel); this.txInterrupted = false; this.autocommit = true; + this.xid = new AtomicLong(1); } @Override diff --git a/src/main/java/io/mycat/server/ServerQueryHandler.java b/src/main/java/io/mycat/server/ServerQueryHandler.java index 3d39132a9..36320a659 100644 --- a/src/main/java/io/mycat/server/ServerQueryHandler.java +++ b/src/main/java/io/mycat/server/ServerQueryHandler.java @@ -71,7 +71,6 @@ public class ServerQueryHandler implements FrontendQueryHandler { // int rs = ServerParse.parse(sql); int sqlType = rs & 0xff; - switch (sqlType) { //explain sql case ServerParse.EXPLAIN: @@ -112,10 +111,10 @@ public class ServerQueryHandler implements FrontendQueryHandler { UseHandler.handle(sql, c, rs >>> 8); break; case ServerParse.COMMIT: - CommitHandler.handle(c); + CommitHandler.handle(sql, c); break; case ServerParse.ROLLBACK: - RollBackHandler.handle(c); + RollBackHandler.handle(sql, c); break; case ServerParse.HELP: LOGGER.warn(new StringBuilder().append("Unsupported command:").append(sql).toString()); diff --git a/src/main/java/io/mycat/server/handler/BeginHandler.java b/src/main/java/io/mycat/server/handler/BeginHandler.java index db7435bdb..1cd010bd7 100644 --- a/src/main/java/io/mycat/server/handler/BeginHandler.java +++ b/src/main/java/io/mycat/server/handler/BeginHandler.java @@ -23,6 +23,7 @@ */ package io.mycat.server.handler; +import io.mycat.log.transaction.TxnLogHelper; import io.mycat.net.mysql.OkPacket; import io.mycat.server.ServerConnection; @@ -31,10 +32,13 @@ public final class BeginHandler { if (c.isTxstart() || !c.isAutocommit()) { c.setTxstart(true); c.commit(); + TxnLogHelper.putTxnLog(c, "commit[because of begin]"); + c.getAndIncrementXid(); } else { c.setTxstart(true); c.write(c.writeToBuffer(OkPacket.OK, c.allocate())); } + TxnLogHelper.putTxnLog(c, stmt); } } \ No newline at end of file diff --git a/src/main/java/io/mycat/server/handler/CommitHandler.java b/src/main/java/io/mycat/server/handler/CommitHandler.java index f41815779..26ba1fe13 100644 --- a/src/main/java/io/mycat/server/handler/CommitHandler.java +++ b/src/main/java/io/mycat/server/handler/CommitHandler.java @@ -1,10 +1,13 @@ package io.mycat.server.handler; +import io.mycat.log.transaction.TxnLogHelper; import io.mycat.server.ServerConnection; public final class CommitHandler { - public static void handle(ServerConnection c) { - c.commit(); - c.setTxstart(false); - } + public static void handle(String stmt, ServerConnection c) { + c.commit(); + TxnLogHelper.putTxnLog(c, stmt); + c.setTxstart(false); + c.getAndIncrementXid(); + } } diff --git a/src/main/java/io/mycat/server/handler/RollBackHandler.java b/src/main/java/io/mycat/server/handler/RollBackHandler.java index 6ea043683..5dbc6ac4c 100644 --- a/src/main/java/io/mycat/server/handler/RollBackHandler.java +++ b/src/main/java/io/mycat/server/handler/RollBackHandler.java @@ -1,10 +1,12 @@ package io.mycat.server.handler; +import io.mycat.log.transaction.TxnLogHelper; import io.mycat.server.ServerConnection; public final class RollBackHandler { - public static void handle( ServerConnection c) { + public static void handle(String stmt, ServerConnection c) { c.rollback(); c.setTxstart(false); + TxnLogHelper.putTxnLog(c, stmt); } } diff --git a/src/main/java/io/mycat/server/handler/SetHandler.java b/src/main/java/io/mycat/server/handler/SetHandler.java index 3286cb7f7..9f9023380 100644 --- a/src/main/java/io/mycat/server/handler/SetHandler.java +++ b/src/main/java/io/mycat/server/handler/SetHandler.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import io.mycat.config.ErrorCode; import io.mycat.config.Isolations; +import io.mycat.log.transaction.TxnLogHelper; import io.mycat.net.mysql.OkPacket; import io.mycat.server.ServerConnection; import io.mycat.server.parser.ServerParseSet; @@ -68,13 +69,16 @@ public final class SetHandler { c.write(c.writeToBuffer(OkPacket.OK, c.allocate())); } else { c.commit(); + TxnLogHelper.putTxnLog(c, "commit[because of "+stmt+"]"); c.setTxstart(false); + c.getAndIncrementXid(); c.setAutocommit(true); } break; case AUTOCOMMIT_OFF: { if (c.isAutocommit()) { c.setAutocommit(false); + TxnLogHelper.putTxnLog(c, stmt); } c.write(c.writeToBuffer(AC_OFF, c.allocate())); break; diff --git a/src/main/java/io/mycat/server/handler/StartHandler.java b/src/main/java/io/mycat/server/handler/StartHandler.java index 33daf6d59..51d89c614 100644 --- a/src/main/java/io/mycat/server/handler/StartHandler.java +++ b/src/main/java/io/mycat/server/handler/StartHandler.java @@ -23,6 +23,7 @@ */ package io.mycat.server.handler; +import io.mycat.log.transaction.TxnLogHelper; import io.mycat.net.mysql.OkPacket; import io.mycat.server.ServerConnection; import io.mycat.server.parser.ServerParse; @@ -42,6 +43,7 @@ public final class StartHandler { c.setTxstart(true); c.writeToBuffer(OkPacket.OK, c.allocate()); } + TxnLogHelper.putTxnLog(c, stmt); break; default: c.execute(stmt, ServerParse.START);