Merge pull request #3009 from actiontech/fix/1444-master

fix: optimize split(cherry-pick & merge from 3.21.02.99)
This commit is contained in:
LUA
2021-12-15 15:09:06 +08:00
committed by GitHub
18 changed files with 1101 additions and 381 deletions
@@ -0,0 +1,21 @@
package com.actiontech.dble.btrace.provider;
public final class SplitFileProvider {
private SplitFileProvider() {
}
public static void getReadQueueSizeOfPut(int queueSize) {
}
public static void getReadQueueSizeOfPoll(int queueSize) {
}
public static void getHandleQueueSizeOfTake(int queueSize) {
}
public static void getWriteQueueSizeOfPut(int queueSize) {
}
public static void getWriteQueueSizeOfTake(int queueSize) {
}
}
@@ -8,9 +8,10 @@ public class DumpFileConfig {
private String defaultSchema;
private int readQueueSize = 500;
private String writePath;
private int writeQueueSize = 500;
private int writeQueueSize = 512;
private int maxValues = 4000;
private int threadNum = 2;
private int bufferSize = 1048576;
private boolean isIgnore = false;
@@ -80,4 +81,12 @@ public class DumpFileConfig {
public void setThreadNum(int threadNum) {
this.threadNum = threadNum;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
}
@@ -29,7 +29,6 @@ public final class DumpFileContext {
private boolean isSkip = false;
private DumpFileWriter writer;
private List<ErrorMsg> errors;
private boolean needSkipError;
private DumpFileConfig config;
public DumpFileContext() {
@@ -42,7 +41,7 @@ public final class DumpFileContext {
}
public DumpFileContext(String schema, List<String> defaultShardingNodes, Set<String> allShardingNodes, DumpFileWriter writer, DumpFileConfig config,
String table, BaseTableConfig tableConfig, int partitionColumnIndex, int incrementColumnIndex, boolean isSkip, boolean needSkipError) {
String table, BaseTableConfig tableConfig, int partitionColumnIndex, int incrementColumnIndex, boolean isSkip) {
this.schema = schema;
this.defaultShardingNodes = defaultShardingNodes;
this.allShardingNodes = allShardingNodes;
@@ -54,7 +53,6 @@ public final class DumpFileContext {
this.partitionColumnIndex = partitionColumnIndex;
this.incrementColumnIndex = incrementColumnIndex;
this.isSkip = isSkip;
this.needSkipError = needSkipError;
}
public String getSchema() {
@@ -100,14 +98,13 @@ public final class DumpFileContext {
this.tableConfig = null;
return;
}
if (table.equalsIgnoreCase(this.table)) {
if (table.equals(this.table) || table.equalsIgnoreCase(this.table)) {
return;
}
this.table = table;
this.isSkip = false;
this.partitionColumnIndex = -1;
this.incrementColumnIndex = -1;
this.needSkipError = false;
if (this.schema == null) {
throw new DumpException("Can't tell which schema the table[" + table + "] belongs to.");
}
@@ -150,21 +147,27 @@ public final class DumpFileContext {
}
public void addError(String error) {
this.errors.add(new ErrorMsg(schema + "-" + table, error));
StringBuilder target = new StringBuilder(100);
if (schema != null) {
target.append(schema);
}
if (schema != null && table != null) {
target.append('-');
}
if (table != null) {
target.append(table);
}
if (schema == null && table == null) {
target = new StringBuilder("ERROR");
}
this.errors.add(new ErrorMsg(target.toString(), error));
}
public List<ErrorMsg> getErrors() {
return errors;
}
public boolean isNeedSkipError() {
return needSkipError;
}
public void setNeedSkipError(boolean needSkipError) {
this.needSkipError = needSkipError;
}
public DumpFileConfig getConfig() {
return config;
}
@@ -175,7 +178,7 @@ public final class DumpFileContext {
public DumpFileContext copyOf(DumpFileContext context) {
return new DumpFileContext(context.getSchema(), context.getDefaultShardingNodes(), context.getAllShardingNodes(), context.getWriter(), context.getConfig(),
context.getTable(), context.getTableConfig(), context.getPartitionColumnIndex(), context.getIncrementColumnIndex(), context.isSkip, context.isNeedSkipError());
context.getTable(), context.getTableConfig(), context.getPartitionColumnIndex(), context.getIncrementColumnIndex(), context.isSkip);
}
}
@@ -1,5 +1,6 @@
package com.actiontech.dble.services.manager.dump;
import com.actiontech.dble.btrace.provider.SplitFileProvider;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.parser.ServerParseFactory;
@@ -11,8 +12,8 @@ import com.alibaba.druid.sql.ast.SQLStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLSyntaxErrorException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
@@ -27,8 +28,10 @@ public final class DumpFileExecutor implements Runnable {
private final DumpFileContext context;
private final NameableExecutor nameableExecutor;
private final AtomicInteger threadNum = new AtomicInteger(0);
private final AtomicBoolean errorFlag;
public DumpFileExecutor(BlockingQueue<String> queue, BlockingQueue<String> insertQueue, DumpFileWriter writer, DumpFileConfig config, SchemaConfig schemaConfig, NameableExecutor nameableExecutor) {
public DumpFileExecutor(BlockingQueue<String> queue, BlockingQueue<String> insertQueue, DumpFileWriter writer, DumpFileConfig config,
SchemaConfig schemaConfig, NameableExecutor nameableExecutor, AtomicBoolean flag) {
this.ddlQueue = queue;
this.insertQueue = insertQueue;
this.context = new DumpFileContext(writer, config);
@@ -36,6 +39,7 @@ public final class DumpFileExecutor implements Runnable {
this.context.setDefaultSchema(schemaConfig);
}
this.nameableExecutor = nameableExecutor;
this.errorFlag = flag;
}
@Override
@@ -53,10 +57,11 @@ public final class DumpFileExecutor implements Runnable {
//thread pool expansion
expansionThreadPool();
stmt = this.insertQueue.poll();
if (StringUtil.isBlank(stmt)) {
if (StringUtil.isEmpty(stmt)) {
stmt = this.ddlQueue.poll();
}
if (StringUtil.isBlank(stmt)) {
SplitFileProvider.getReadQueueSizeOfPoll(this.insertQueue.size());
if (StringUtil.isEmpty(stmt)) {
continue;
}
int type = ServerParseFactory.getShardingParser().parse(stmt);
@@ -80,20 +85,19 @@ public final class DumpFileExecutor implements Runnable {
} else {
handler.handle(this.context, statement);
}
} catch (DumpException | SQLSyntaxErrorException e) {
} catch (DumpException e) {
assert stmt != null;
String currentStmt = stmt.length() <= 1024 ? stmt : stmt.substring(0, 1024);
this.context.setSkipContext(true);
LOGGER.warn("current stmt[" + currentStmt + "] error.", e);
this.context.addError("current stmt[" + currentStmt + "] error,because:" + e.getMessage());
} catch (InterruptedException ie) {
LOGGER.warn("dump file executor is interrupted.");
return;
} catch (Exception e) {
} catch (InterruptedException e) {
LOGGER.debug("dump file executor is interrupted.");
break;
} catch (Exception | Error e) {
LOGGER.warn("dump file executor exit", e);
this.context.addError("dump file executor exit, because:" + e.getMessage());
stopWriter(writer);
this.nameableExecutor.shutdown();
errorFlag.compareAndSet(false, true);
break;
}
}
@@ -132,18 +136,8 @@ public final class DumpFileExecutor implements Runnable {
return this.context;
}
private void stopWriter(DumpFileWriter writer) {
try {
writer.setDeleteFile(true);
writer.writeAll(DumpFileReader.EOF);
} catch (InterruptedException ex) {
// ignore
LOGGER.warn("dump file executor is interrupted.");
}
}
private boolean preHandle(DumpFileWriter writer, int type, String stmt) throws
InterruptedException, RuntimeException {
RuntimeException {
// push down statement util containing sharding
if (!(ServerParse.CREATE_DATABASE == type || ServerParse.USE == (0xff & type)) && this.context.getSchema() == null) {
if (ServerParse.DDL == type || ServerParse.INSERT == type || ServerParse.LOCK == type) {
@@ -153,8 +147,9 @@ public final class DumpFileExecutor implements Runnable {
return true;
}
// skip view
if (ServerParse.MYSQL_CMD_COMMENT == type || ServerParse.MYSQL_COMMENT == type) {
return skipView(stmt);
if ((ServerParse.MYSQL_CMD_COMMENT == type || ServerParse.MYSQL_COMMENT == type) && skipView(stmt)) {
context.setSkipContext(true);
return true;
}
// footer
if (stmt.contains("=@OLD_")) {
@@ -0,0 +1,162 @@
package com.actiontech.dble.services.manager.dump;
import com.actiontech.dble.btrace.provider.SplitFileProvider;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.parser.ServerParseFactory;
import com.actiontech.dble.util.NameableExecutor;
import com.actiontech.dble.util.StringUtil;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
/**
* @author Baofengqi
*/
public final class DumpFileHandler implements Runnable {
public static final Logger LOGGER = LoggerFactory.getLogger("dumpFileLog");
private final BlockingQueue<String> handleQueue;
private final BlockingQueue<String> ddlQueue;
private final BlockingQueue<String> insertQueue;
private StringBuilder tempStr = new StringBuilder(1000);
private final NameableExecutor nameableExecutor;
private Map<String, String> errorMap;
private AtomicBoolean errorFlag;
public DumpFileHandler(BlockingQueue<String> queue, BlockingQueue<String> insertDeque, BlockingQueue<String> handleQueue,
NameableExecutor nameableExecutor, Map<String, String> map, AtomicBoolean flag) {
this.ddlQueue = queue;
this.insertQueue = insertDeque;
this.handleQueue = handleQueue;
this.nameableExecutor = nameableExecutor;
this.errorMap = map;
this.errorFlag = flag;
}
@Override
public void run() {
while (true) {
try {
String stmts = handleQueue.take();
SplitFileProvider.getHandleQueueSizeOfTake(handleQueue.size());
if (stmts.isEmpty()) {
continue;
}
if (stmts.equals(DumpFileReader.EOF)) {
if (null != tempStr && !StringUtil.isBlank(tempStr.toString())) {
putSql(tempStr.toString());
this.tempStr = null;
}
putSql(DumpFileReader.EOF);
break;
}
readSQLByEOF(stmts);
} catch (InterruptedException e) {
LOGGER.debug("dump file handler is interrupted.");
break;
} catch (Error e) {
LOGGER.warn("dump file error", e);
errorFlag.compareAndSet(false, true);
errorMap.putIfAbsent("file handler error", "handler error,because:" + e.getMessage());
}
}
}
// read one statement by ;\n
private void readSQLByEOF(String stmts) throws InterruptedException {
boolean endWithEOF = stmts.endsWith(";") || stmts.endsWith(";\n");
List<String> strings = splitContent(stmts, ";\n", ";\r\n");
int len = strings.size() - 1;
int i = 0;
if (len > 0 && tempStr != null && !StringUtil.isEmpty(tempStr.toString())) {
tempStr.append(strings.get(0));
putSql(tempStr.toString());
tempStr = null;
i = 1;
}
for (; i < len; i++) {
if (!StringUtil.isEmpty(strings.get(i))) {
putSql(strings.get(i));
}
}
if (!endWithEOF) {
if (tempStr == null) {
tempStr = new StringBuilder(strings.get(len));
} else {
tempStr.append(strings.get(len));
}
} else {
if (tempStr != null && !StringUtil.isEmpty(tempStr.toString())) {
tempStr.append(strings.get(len));
putSql(tempStr.toString());
tempStr = null;
} else {
if (!StringUtil.isEmpty(strings.get(len))) {
putSql(strings.get(len));
}
}
}
}
public static List<String> splitContent(String content, String linuxSeparate, String windowsSeparate) {
List<String> list = Lists.newArrayList();
boolean linuxFlag = true;
while (true) {
int j = content.indexOf(linuxSeparate);
if (j < 0) {
//windows
j = content.indexOf(windowsSeparate);
linuxFlag = false;
}
if (j < 0) {
if (!content.isEmpty()) {
list.add(content);
}
break;
}
list.add(content.substring(0, j));
content = content.substring(j + (linuxFlag ? linuxSeparate : windowsSeparate).length());
}
if (list.isEmpty()) {
list.add(content);
}
return list;
}
public void putSql(String sql) throws InterruptedException {
if (StringUtil.isEmpty(sql)) {
return;
}
int type = ServerParseFactory.getShardingParser().parse(sql);
if (ServerParse.INSERT == type) {
while (!this.ddlQueue.isEmpty() && !this.nameableExecutor.isShutdown()) {
LockSupport.parkNanos(1000);
}
this.insertQueue.put(sql);
SplitFileProvider.getReadQueueSizeOfPut(this.insertQueue.size());
} else {
while (!this.insertQueue.isEmpty() && !this.nameableExecutor.isShutdown()) {
LockSupport.parkNanos(1000);
}
this.ddlQueue.put(sql);
}
}
}
@@ -1,11 +1,7 @@
package com.actiontech.dble.services.manager.dump;
import com.actiontech.dble.backend.mysql.store.fs.FileUtils;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.manager.dump.handler.InsertHandler;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.NameableExecutor;
import com.actiontech.dble.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -13,67 +9,71 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.LockSupport;
import java.util.regex.Matcher;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
/**
* @author Baofengqi
*/
public final class DumpFileReader {
public final class DumpFileReader implements Runnable {
public static final Logger LOGGER = LoggerFactory.getLogger("dumpFileLog");
public static final String EOF = "dump file eof";
public static final Pattern CREATE_VIEW = Pattern.compile("CREATE\\s+VIEW\\s+`?([a-zA-Z_0-9\\-_]+)`?\\s+", Pattern.CASE_INSENSITIVE);
public static final Pattern CREATE_VIEW1 = Pattern.compile("CREATE\\s+ALGORITHM", Pattern.CASE_INSENSITIVE);
private StringBuilder tempStr = new StringBuilder(200);
private final BlockingQueue<String> ddlQueue;
private final BlockingQueue<String> insertQueue;
private FileChannel fileChannel;
private long fileLength;
private long readLength;
private int readPercent;
private NameableExecutor nameableExecutor;
private final BlockingQueue<String> handleQueue;
private DumpFileConfig fileConfig;
private Map<String, String> errorMap;
private AtomicBoolean errorFlag;
public DumpFileReader(BlockingQueue<String> queue, BlockingQueue<String> insertQueue) {
this.ddlQueue = queue;
this.insertQueue = insertQueue;
public DumpFileReader(BlockingQueue<String> handleQueue, Map<String, String> map, AtomicBoolean flag) {
this.handleQueue = handleQueue;
this.errorMap = map;
this.errorFlag = flag;
}
public void open(String fileName) throws IOException {
public void open(String fileName, DumpFileConfig config) throws IOException {
this.fileChannel = FileUtils.open(fileName, "r");
this.fileLength = this.fileChannel.size();
this.fileConfig = config;
}
public void start(ManagerService service, NameableExecutor executor) throws IOException, InterruptedException {
this.nameableExecutor = executor;
@Override
public void run() {
LOGGER.info("begin to read dump file.");
TraceManager.TraceObject traceObject = TraceManager.threadTrace("dump-file-read");
try {
ByteBuffer buffer = ByteBuffer.allocate(0x20000);
ByteBuffer buffer = ByteBuffer.allocate(fileConfig.getBufferSize());
int byteRead = fileChannel.read(buffer);
while (byteRead != -1) {
if (service.getConnection().isClosed()) {
LOGGER.info("finish to read dump file, because the connection is closed.");
nameableExecutor.shutdownNow();
return;
}
readLength += byteRead;
float percent = ((float) readLength / (float) fileLength) * 100;
if (((int) percent) - readPercent > 5 || (int) percent == 100) {
readPercent = (int) percent;
LOGGER.info("dump file has bean read " + readPercent + "%");
}
readSQLByEOF(buffer.array(), byteRead);
String stmts = new String(buffer.array(), 0, byteRead, StandardCharsets.UTF_8);
this.handleQueue.put(stmts);
buffer.clear();
byteRead = fileChannel.read(buffer);
}
if (null != tempStr && !StringUtil.isBlank(tempStr.toString())) {
putSql(tempStr.toString());
this.tempStr = null;
}
putSql(EOF);
this.handleQueue.put(EOF);
} catch (IOException e) {
LOGGER.warn("dump file exception", e);
errorFlag.compareAndSet(false, true);
errorMap.putIfAbsent("file reader exception", "reader exception,because:" + e.getMessage());
} catch (InterruptedException e) {
LOGGER.debug("dump file reader is interrupted.");
} catch (Error e) {
LOGGER.warn("dump file error", e);
errorFlag.compareAndSet(false, true);
errorMap.putIfAbsent("file reader error", "reader error,because:" + e.getMessage());
} finally {
TraceManager.finishSpan(traceObject);
try {
@@ -86,62 +86,4 @@ public final class DumpFileReader {
}
}
}
// read one statement by ;
private void readSQLByEOF(byte[] linesByte, int byteRead) throws InterruptedException {
String stmts = new String(linesByte, 0, byteRead, StandardCharsets.UTF_8);
boolean endWithEOF = stmts.endsWith(";") || stmts.endsWith(";\n");
String[] lines = stmts.split(";\\r?\\n");
int len = lines.length - 1;
int i = 0;
if (len > 0 && tempStr != null && !StringUtil.isBlank(tempStr.toString())) {
tempStr.append(lines[0]);
putSql(tempStr.toString());
tempStr = null;
i = 1;
}
for (; i < len; i++) {
if (!StringUtil.isBlank(lines[i])) {
putSql(lines[i]);
}
}
if (!endWithEOF) {
if (tempStr == null) {
tempStr = new StringBuilder(lines[len]);
} else {
tempStr.append(lines[len]);
}
} else {
if (tempStr != null && !StringUtil.isBlank(tempStr.toString())) {
tempStr.append(lines[len]);
putSql(tempStr.toString());
tempStr = null;
} else {
if (!StringUtil.isBlank(lines[len])) {
putSql(lines[len]);
}
}
}
}
public void putSql(String sql) throws InterruptedException {
if (StringUtil.isBlank(sql)) {
return;
}
Matcher matcher = InsertHandler.INSERT_STMT.matcher(sql);
if (matcher.find()) {
while (!this.ddlQueue.isEmpty() && !this.nameableExecutor.isShutdown()) {
LockSupport.parkNanos(1000);
}
this.insertQueue.put(sql);
} else {
while (!this.insertQueue.isEmpty() && !this.nameableExecutor.isShutdown()) {
LockSupport.parkNanos(1000);
}
this.ddlQueue.put(sql);
}
}
}
@@ -2,28 +2,27 @@ package com.actiontech.dble.services.manager.dump;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.store.fs.FileUtils;
import com.actiontech.dble.route.parser.util.DruidUtil;
import com.actiontech.dble.services.manager.dump.handler.InsertHandler;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.services.manager.dump.handler.ShardingValuesHandler;
import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.StringUtil;
import com.actiontech.dble.util.TimeUtil;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.sql.SQLSyntaxErrorException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
public class DumpFileWriter {
@@ -33,6 +32,13 @@ public class DumpFileWriter {
private final AtomicInteger finished = new AtomicInteger(0);
private volatile boolean isDeleteFile = false;
private int maxValues;
// translator
private final EventTranslatorOneArg<Element, Object> translator = (event, sequence, arg0) -> event.set(arg0);
private Map<String, String> writerErrorMap;
public DumpFileWriter(Map<String, String> writerErrorMap) {
this.writerErrorMap = writerErrorMap;
}
public void open(String writePath, int writeQueueSize, int maxValue) throws IOException {
Set<String> shardingNodes = DbleServer.getInstance().getConfig().getShardingNodes().keySet();
@@ -46,60 +52,76 @@ public class DumpFileWriter {
}
public void start() {
Thread writer;
for (Map.Entry<String, ShardingNodeWriter> entry : shardingNodeWriters.entrySet()) {
writer = new Thread(entry.getValue(), entry.getKey() + "-writer-" + finished.incrementAndGet());
writer.start();
entry.getValue().self = writer;
finished.incrementAndGet();
entry.getValue().start();
}
}
public void stop() {
public void stop(boolean errorFlag) throws IOException {
for (Map.Entry<String, ShardingNodeWriter> entry : shardingNodeWriters.entrySet()) {
if (entry.getValue().self != null) {
entry.getValue().self.interrupt();
}
entry.getValue().close(errorFlag);
entry.getValue().disruptor.shutdown();
}
shardingNodeWriters.clear();
}
public void write(String shardingNode, String stmt) throws InterruptedException {
public void write(String shardingNode, String stmt) {
ShardingNodeWriter writer = this.shardingNodeWriters.get(shardingNode);
if (writer != null) {
writer.write(stmt);
}
}
public void writeInsertHeader(String shardingNode, String stmt) {
public void writeInsertHeader(String shardingNode, ShardingValuesHandler.InsertQuery insertQuery) {
ShardingNodeWriter writer = this.shardingNodeWriters.get(shardingNode);
if (writer != null) {
writer.write(stmt);
writer.write(insertQuery);
}
}
public void writeAll(String stmt) throws InterruptedException {
public void writeAll(String obj) {
for (ShardingNodeWriter writer : shardingNodeWriters.values()) {
writer.write(stmt);
writer.write(obj);
}
}
public boolean isFinished() {
return finished.get() == 0;
return finished.get() <= 0;
}
public void setDeleteFile(boolean deleteFile) {
this.isDeleteFile = deleteFile;
}
class ShardingNodeWriter implements Runnable {
private FileChannel fileChannel;
private final BlockingQueue<String> queue;
static class Element {
private Object value;
Element() {
}
public Object get() {
return value;
}
public void set(Object val) {
this.value = val;
}
}
class ShardingNodeWriter {
private BufferedWriter bufferedWriter;
private final BlockingQueue<Object> queue;
private Disruptor<Element> disruptor;
private final int queueSize;
private final String shardingNode;
private String path;
private Thread self;
private String currentTable;
private int rows = 1;
private boolean isFirst = true;
private boolean error = false;
ShardingNodeWriter(String shardingNode, int queueSize) {
this.shardingNode = shardingNode;
@@ -107,119 +129,194 @@ public class DumpFileWriter {
this.queue = new ArrayBlockingQueue<>(queueSize);
}
public BlockingQueue<String> getQueue() {
public BlockingQueue<Object> getQueue() {
return queue;
}
void open(String fileName) throws IOException {
this.path = fileName;
this.fileChannel = FileUtils.open(fileName, "rw");
this.bufferedWriter = new BufferedWriter(new FileWriter(fileName));
}
void write(String stmt) {
try {
this.queue.put(stmt);
} catch (InterruptedException e) {
e.printStackTrace();
void write(Object obj) {
if (null == this.disruptor || error) {
return;
}
if (!disruptor.getRingBuffer().tryPublishEvent(translator, obj)) {
disruptor.getRingBuffer().publishEvent(translator, obj);
}
}
void close() throws IOException {
this.fileChannel.close();
shardingNodeWriters.remove(shardingNode);
if (isDeleteFile) {
void close(boolean errorFlag) throws IOException {
this.bufferedWriter.close();
if (isDeleteFile || errorFlag) {
FileUtils.delete(path);
}
}
public void setCurrentTable(String currentTable) {
this.currentTable = currentTable;
}
@Override
public void run() {
public void start() {
String wrapStr = ";\n";
try {
String stmt;
long startTime = TimeUtil.currentTimeMillis();
while (!Thread.currentThread().isInterrupted()) {
stmt = this.queue.take();
if (StringUtil.isBlank(stmt)) {
continue;
}
if (LOGGER.isDebugEnabled()) {
long endTime = TimeUtil.currentTimeMillis();
if (endTime - startTime > 1000) {
startTime = endTime;
if (queue.isEmpty()) {
LOGGER.debug("dump file executor parse statement slowly.");
} else if (this.queue.size() == queueSize) {
LOGGER.debug("dump file writer is slow, you can try increasing write queue size.");
}
}
}
if (stmt.equals(DumpFileReader.EOF)) {
this.fileChannel.write(ByteBuffer.wrap(wrapStr.getBytes()));
EventFactory<Element> factory = Element::new;
EventHandler<Element> handler = (element, sequence, endOfBatch) -> {
if (error) {
return;
}
try {
Object content = element.get();
if (null != content && content.equals(DumpFileReader.EOF)) {
this.bufferedWriter.write(wrapStr);
LOGGER.info("finish to write dump file.");
close();
close(false);
finished.decrementAndGet();
return;
}
writeContent(stmt, wrapStr);
}
} catch (IOException e) {
LOGGER.warn("dump file writer[" + shardingNode + "] occur error:" + e.getMessage());
} catch (SQLSyntaxErrorException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
finished.decrementAndGet();
try {
close();
} catch (IOException e) {
// ignore
LOGGER.warn("close dump file error, because:" + e.getMessage());
writeContent(content, wrapStr);
} catch (Exception | Error e) {
error = true;
finished.decrementAndGet();
writerErrorMap.putIfAbsent(this.shardingNode, "writer error,because:" + e.getMessage());
close(true);
}
};
SleepingWaitStrategy strategy = new SleepingWaitStrategy();
try {
disruptor = new Disruptor(factory, this.queueSize, new ThreadFactoryBuilder().setNameFormat("Split_Writer_" + shardingNode).build(), ProducerType.MULTI, strategy);
} catch (IllegalArgumentException e) {
throw new DumpException("The value of -w needs to be a power of 2");
}
disruptor.handleEventsWith(handler);
disruptor.setDefaultExceptionHandler(new SplitWriterExceptionHandler());
disruptor.start();
}
private void writeContent(String stmt, String wrapStr) throws IOException, SQLSyntaxErrorException {
private void writeContent(Object obj, String wrapStr) throws IOException {
String table = null;
Matcher matcher = InsertHandler.INSERT_STMT.matcher(stmt);
if (matcher.find()) {
table = matcher.group(2);
String content = null;
ShardingValuesHandler.InsertQuery insertQuery = null;
if (obj instanceof ShardingValuesHandler.InsertQuery) {
insertQuery = (ShardingValuesHandler.InsertQuery) obj;
table = StringUtil.removeBackQuote(insertQuery.getInsertQueryPos().getTableName());
} else {
content = wrapStr + obj;
}
if (table != null && table.equalsIgnoreCase(this.currentTable) && this.rows < maxValues) {
if (table != null && (table.equals(this.currentTable) || table.equalsIgnoreCase(this.currentTable)) && this.rows < maxValues) {
//splicing insert
MySqlInsertStatement insert = (MySqlInsertStatement) DruidUtil.parseMultiSQL(stmt);
if (insert.getValuesList().size() == 1) {
stmt = getSqlStr(insert.getValuesList().get(0).getValues());
}
writeInsertValue(insertQuery);
rows++;
} else if (!isFirst) {
stmt = wrapStr + stmt;
} else if (table != null) {
this.currentTable = table;
writeInsert(table, insertQuery, wrapStr);
rows = 1;
}
this.currentTable = table;
if (this.fileChannel != null) {
isFirst = false;
this.fileChannel.write(ByteBuffer.wrap(stmt.getBytes()));
} else {
if (this.bufferedWriter != null) {
this.bufferedWriter.write(content);
}
}
}
protected String getSqlStr(List<SQLExpr> values) {
StringBuilder sbValues = new StringBuilder();
sbValues.append(",(");
for (int i = 0; i < values.size(); i++) {
if (i != 0) {
sbValues.append(",");
}
sbValues.append(values.get(i).toString());
private void writeInsert(String table, ShardingValuesHandler.InsertQuery insertQuery, String wrapStr) throws IOException {
// add
StringBuilder insertHeader = new StringBuilder(200);
insertHeader.append("INSERT ");
if (insertQuery.getInsertQueryPos().isIgnore()) {
insertHeader.append("IGNORE ");
}
insertHeader.append("INTO ");
insertHeader.append('`');
insertHeader.append(table);
insertHeader.append('`');
if (!CollectionUtil.isEmpty(insertQuery.getInsertQueryPos().getColumns())) {
Integer start = insertQuery.getInsertQueryPos().getColumnRange().getKey();
Integer end = insertQuery.getInsertQueryPos().getColumnRange().getValue();
insertHeader.append(' ');
insertHeader.append('(');
insertHeader.append(insertQuery.getInsertQueryPos().getInsertChars(), start, end - start);
insertHeader.append(')');
insertHeader.append(' ');
}
insertHeader.append(" VALUES ");
insertHeader.append('(');
if (insertQuery.getIncrementColumnIndex() != -1) {
//has increment column
int index = 0;
for (Pair<Integer, Integer> pair : insertQuery.getValuePair()) {
Integer start = pair.getKey();
Integer end = pair.getValue();
if (index++ == insertQuery.getIncrementColumnIndex()) {
insertHeader.append(insertQuery.getIncrementColumnValue());
} else {
insertHeader.append(insertQuery.getInsertQueryPos().getInsertChars(), start, end - start);
}
if (index != insertQuery.getValuePair().size()) {
insertHeader.append(',');
}
}
} else {
Integer start = insertQuery.getValuePair().get(0).getKey();
Integer end = insertQuery.getValuePair().get(insertQuery.getValuePair().size() - 1).getValue();
insertHeader.append(insertQuery.getInsertQueryPos().getInsertChars(), start, end - start);
}
insertHeader.append(')');
String content = wrapStr + insertHeader.toString();
if (this.bufferedWriter != null) {
this.bufferedWriter.write(content);
}
}
private void writeInsertValue(ShardingValuesHandler.InsertQuery insertQuery) throws IOException {
char[] insertChars = insertQuery.getInsertQueryPos().getInsertChars();
if (this.bufferedWriter != null) {
this.bufferedWriter.write(',');
this.bufferedWriter.write('(');
if (insertQuery.getIncrementColumnIndex() != -1) {
//has increment column
int index = 0;
for (Pair<Integer, Integer> pair : insertQuery.getValuePair()) {
Integer start = pair.getKey();
Integer end = pair.getValue();
if (index++ == insertQuery.getIncrementColumnIndex()) {
this.bufferedWriter.write(insertQuery.getIncrementColumnValue() + "");
} else {
this.bufferedWriter.write(insertQuery.getInsertQueryPos().getInsertChars(), start, end - start);
}
if (index != insertQuery.getValuePair().size()) {
this.bufferedWriter.write(',');
}
}
} else {
Integer start = insertQuery.getValuePair().get(0).getKey();
Integer end = insertQuery.getValuePair().get(insertQuery.getValuePair().size() - 1).getValue();
this.bufferedWriter.write(insertChars, start, end - start);
}
this.bufferedWriter.write(')');
}
sbValues.append(")");
return sbValues.toString();
}
}
// exception
public static final class SplitWriterExceptionHandler implements ExceptionHandler {
public SplitWriterExceptionHandler() {
}
@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
LOGGER.warn("Exception processing: {} {} ,exception{}", sequence, event, ex);
}
@Override
public void handleOnStartException(Throwable ex) {
LOGGER.error("Exception during onStart for split disruptor ,exception{}", ex);
}
@Override
public void handleOnShutdownException(Throwable ex) {
LOGGER.error("Exception during onShutdown for split disruptor ,exception{}", ex);
}
}
}
@@ -1,6 +1,8 @@
package com.actiontech.dble.services.manager.dump.handler;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.services.manager.dump.DumpFileContext;
import com.actiontech.dble.services.manager.dump.parse.InsertQueryPos;
import com.alibaba.druid.sql.ast.SQLExpr;
import java.sql.SQLNonTransientException;
@@ -8,47 +10,24 @@ import java.util.List;
public class DefaultValuesHandler {
StringBuilder insertHeader;
void setInsertHeader(StringBuilder insertHeader) {
this.insertHeader = insertHeader;
}
public void process(DumpFileContext context, InsertQueryPos insertQueryPos, List<Pair<Integer, Integer>> valuePair) throws SQLNonTransientException {
public void preProcess(DumpFileContext context) throws InterruptedException {
if (insertHeader == null) {
return;
}
for (String shardingNode : context.getTableConfig().getShardingNodes()) {
context.getWriter().write(shardingNode, insertHeader.toString());
}
}
public void postProcess(DumpFileContext context) throws InterruptedException {
for (String shardingNode : context.getTableConfig().getShardingNodes()) {
context.getWriter().write(shardingNode, ";");
}
}
public void process(DumpFileContext context, List<SQLExpr> values, boolean isFirst) throws InterruptedException, SQLNonTransientException {
for (String shardingNode : context.getTableConfig().getShardingNodes()) {
context.getWriter().write(shardingNode, toString(values, isFirst));
}
}
protected String toString(List<SQLExpr> values, boolean isFirst) {
StringBuilder sbValues = new StringBuilder();
StringBuilder sbValues = new StringBuilder(400);
if (!isFirst) {
sbValues.append(",");
sbValues.append(',');
}
sbValues.append("(");
sbValues.append('(');
for (int i = 0; i < values.size(); i++) {
if (i != 0) {
sbValues.append(",");
sbValues.append(',');
}
sbValues.append(values.get(i).toString());
}
sbValues.append(")");
sbValues.append(')');
return sbValues.toString();
}
}
@@ -3,21 +3,17 @@ package com.actiontech.dble.services.manager.dump.handler;
import com.actiontech.dble.config.model.sharding.table.ShardingTableConfig;
import com.actiontech.dble.meta.ColumnMeta;
import com.actiontech.dble.meta.TableMeta;
import com.actiontech.dble.route.parser.util.DruidUtil;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.parser.ServerParseFactory;
import com.actiontech.dble.services.manager.dump.DumpException;
import com.actiontech.dble.services.manager.dump.DumpFileContext;
import com.actiontech.dble.services.manager.dump.parse.InsertParser;
import com.actiontech.dble.services.manager.dump.parse.InsertQueryPos;
import com.actiontech.dble.singleton.ProxyMeta;
import com.actiontech.dble.singleton.SequenceManager;
import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.statement.SQLInsertStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;
import java.sql.SQLNonTransientException;
import java.util.List;
@@ -31,15 +27,14 @@ public class InsertHandler extends DefaultHandler {
private final DefaultValuesHandler defaultValuesHandler = new DefaultValuesHandler();
@Override
public SQLStatement preHandle(DumpFileContext context, String stmt) throws DumpException, SQLNonTransientException {
DumpFileContext fileContext = new DumpFileContext().copyOf(context);
public SQLStatement preHandle(DumpFileContext fileContext, String stmt) throws DumpException, SQLNonTransientException {
// get table name simply
int type = ServerParseFactory.getShardingParser().parse(stmt);
String table = null;
Matcher matcher = InsertHandler.INSERT_STMT.matcher(stmt);
if (matcher.find()) {
table = matcher.group(2);
}
InsertParser insertParser = new InsertParser(stmt);
InsertQueryPos insertQueryPos = insertParser.parseStatement();
insertQueryPos.setInsertString(stmt);
String table = StringUtil.removeBackQuote(insertQueryPos.getTableName());
fileContext.setTable(table);
if (fileContext.isSkipContext() || !(fileContext.getTableConfig() instanceof ShardingTableConfig)) {
@@ -53,53 +48,29 @@ public class InsertHandler extends DefaultHandler {
return null;
}
MySqlInsertStatement insert = (MySqlInsertStatement) DruidUtil.parseMultiSQL(stmt);
// check columns from insert columns
checkColumns(fileContext, insert.getColumns());
// add
StringBuilder insertHeader = new StringBuilder("INSERT ");
if (insert.isIgnore() || fileContext.getConfig().isIgnore()) {
insert.setIgnore(true);
insertHeader.append("IGNORE ");
checkColumns(fileContext, insertQueryPos.getColumns());
if (fileContext.getConfig().isIgnore()) {
insertQueryPos.setIgnore(true);
}
insertHeader.append("INTO ");
insertHeader.append("`");
insertHeader.append(fileContext.getTable());
insertHeader.append("`");
if (!CollectionUtil.isEmpty(insert.getColumns())) {
insertHeader.append(insert.getColumns().toString());
}
insertHeader.append(" VALUES ");
DefaultValuesHandler valuesHandler;
if (fileContext.getTableConfig() instanceof ShardingTableConfig) {
valuesHandler = shardingValuesHandler;
} else {
valuesHandler = defaultValuesHandler;
}
valuesHandler.setInsertHeader(insertHeader);
try {
handleStatement(fileContext, insert, valuesHandler);
} catch (InterruptedException e) {
e.printStackTrace();
}
return insert;
handleStatement(fileContext, insertQueryPos, valuesHandler);
return null;
}
public void handleStatement(DumpFileContext context, SQLStatement sqlStatement, DefaultValuesHandler valuesHandler) throws InterruptedException {
MySqlInsertStatement insert = (MySqlInsertStatement) sqlStatement;
SQLInsertStatement.ValuesClause valueClause;
valuesHandler.preProcess(context);
for (int i = 0; i < insert.getValuesList().size(); i++) {
valueClause = insert.getValuesList().get(i);
public void handleStatement(DumpFileContext context, InsertQueryPos insertQueryPos, DefaultValuesHandler valuesHandler) {
for (List<Pair<Integer, Integer>> valuePair : insertQueryPos.getValueItemsRange()) {
try {
processIncrementColumn(context, valueClause.getValues());
valuesHandler.process(context, valueClause.getValues(), i == 0);
valuesHandler.process(context, insertQueryPos, valuePair);
} catch (SQLNonTransientException e) {
context.addError(e.getMessage());
}
}
valuesHandler.postProcess(context);
}
public void handleSQL(DumpFileContext context, String stmt) throws InterruptedException {
@@ -114,28 +85,12 @@ public class InsertHandler extends DefaultHandler {
@Override
public void handle(DumpFileContext context, SQLStatement sqlStatement) throws InterruptedException {
return;
}
@Override
public void handle(DumpFileContext context, String stmt) throws InterruptedException {
}
private void processIncrementColumn(DumpFileContext context, List<SQLExpr> values) throws SQLNonTransientException {
int incrementIndex = context.getIncrementColumnIndex();
if (incrementIndex == -1) {
return;
}
String tableKey = StringUtil.getFullName(context.getSchema(), context.getTable());
long val = SequenceManager.getHandler().nextId(tableKey, null);
SQLExpr value = values.get(incrementIndex);
if (!StringUtil.isEmpty(SQLUtils.toMySqlString(value)) && !context.isNeedSkipError()) {
context.addError("For table using global sequence, dble has set increment column values for you.");
context.setNeedSkipError(true);
}
values.set(incrementIndex, new SQLIntegerExpr(val));
return;
}
/**
@@ -145,7 +100,7 @@ public class InsertHandler extends DefaultHandler {
* @param context context
* @param columns columns
*/
private void checkColumns(DumpFileContext context, List<SQLExpr> columns) throws DumpException, SQLNonTransientException {
private void checkColumns(DumpFileContext context, List<String> columns) throws DumpException, SQLNonTransientException {
int partitionColumnIndex = context.getPartitionColumnIndex();
int incrementColumnIndex = context.getIncrementColumnIndex();
@@ -159,8 +114,7 @@ public class InsertHandler extends DefaultHandler {
if (tableConfig.getIncrementColumn() != null || tableConfig.getShardingColumn() != null) {
if (!CollectionUtil.isEmpty(columns)) {
for (int i = 0; i < columns.size(); i++) {
SQLExpr column = columns.get(i);
String columnName = StringUtil.removeBackQuote(column.toString());
String columnName = columns.get(i);
if (tableConfig.getIncrementColumn() != null && columnName.equalsIgnoreCase(tableConfig.getIncrementColumn())) {
incrementColumnIndex = i;
}
@@ -198,4 +152,5 @@ public class InsertHandler extends DefaultHandler {
context.setIncrementColumnIndex(incrementColumnIndex);
context.setPartitionColumnIndex(partitionColumnIndex);
}
}
@@ -17,7 +17,7 @@ public class SchemaHandler extends DefaultHandler {
@Override
public SQLStatement preHandle(DumpFileContext context, String stmt) throws SQLSyntaxErrorException {
String schema;
stmt = stmt.replace("/*!", "/*#");
stmt = stmt.replace("/*!", "/*#").replace("\r", "");
SQLStatement sqlStatement = DruidUtil.parseMultiSQL(stmt);
if (sqlStatement instanceof SQLUseStatement) {
SQLUseStatement use = (SQLUseStatement) sqlStatement;
@@ -1,12 +1,12 @@
package com.actiontech.dble.services.manager.dump.handler;
import com.actiontech.dble.config.model.sharding.table.ShardingTableConfig;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.services.manager.dump.DumpFileContext;
import com.actiontech.dble.route.function.AbstractPartitionAlgorithm;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.expr.SQLNullExpr;
import com.actiontech.dble.services.manager.dump.parse.InsertQueryPos;
import com.actiontech.dble.singleton.SequenceManager;
import com.actiontech.dble.util.StringUtil;
import java.sql.SQLNonTransientException;
import java.util.List;
@@ -14,46 +14,76 @@ import java.util.List;
public class ShardingValuesHandler extends DefaultValuesHandler {
@Override
public void preProcess(DumpFileContext context) {
}
@Override
public void postProcess(DumpFileContext context) {
}
@Override
public void process(DumpFileContext context, List<SQLExpr> values, boolean isFirst) throws SQLNonTransientException, InterruptedException {
Integer nodeIndex = handleShardingColumn(context, values);
public void process(DumpFileContext context, InsertQueryPos insertQueryPos, List<Pair<Integer, Integer>> valuePair) throws SQLNonTransientException {
Integer nodeIndex = handleShardingColumn(context, insertQueryPos, valuePair);
String shardingNode = context.getTableConfig().getShardingNodes().get(nodeIndex);
context.getWriter().writeInsertHeader(shardingNode, insertHeader.toString() + toString(values, true));
long incrementColumnVal = 0;
if (context.getIncrementColumnIndex() != -1) {
String tableKey = StringUtil.getFullName(context.getSchema(), context.getTable());
incrementColumnVal = SequenceManager.getHandler().nextId(tableKey, null);
}
context.getWriter().writeInsertHeader(shardingNode, new InsertQuery(insertQueryPos, valuePair, context.getIncrementColumnIndex(), incrementColumnVal));
}
private Integer handleShardingColumn(DumpFileContext context, List<SQLExpr> values) throws SQLNonTransientException {
private Integer handleShardingColumn(DumpFileContext context, InsertQueryPos insertQueryPos, List<Pair<Integer, Integer>> valuePair) throws SQLNonTransientException {
AbstractPartitionAlgorithm algorithm = ((ShardingTableConfig) context.getTableConfig()).getFunction();
SQLExpr expr = values.get(context.getPartitionColumnIndex());
String shardingValue = null;
if (expr instanceof SQLIntegerExpr) {
SQLIntegerExpr intExpr = (SQLIntegerExpr) expr;
shardingValue = intExpr.getNumber() + "";
} else if (expr instanceof SQLCharExpr) {
SQLCharExpr charExpr = (SQLCharExpr) expr;
shardingValue = charExpr.getText();
} // no need to consider SQLHexExpr
if (shardingValue == null && !(expr instanceof SQLNullExpr)) {
throw new SQLNonTransientException("Not Supported of Sharding Value EXPR :" + values.toString());
}
Pair<Integer, Integer> pair = valuePair.get(context.getPartitionColumnIndex());
String shardingValue = getValueString(insertQueryPos.getInsertChars(), pair);
Integer nodeIndex;
try {
nodeIndex = algorithm.calculate(shardingValue);
// null means can't find any valid index
if (nodeIndex == null || nodeIndex >= context.getTableConfig().getShardingNodes().size()) {
throw new SQLNonTransientException("can't find any valid shardingnode shardingValue" + values.toString());
throw new SQLNonTransientException("can't find any valid shardingnode shardingValue:" + shardingValue);
}
} catch (Exception e) {
throw new SQLNonTransientException("can't calculate valid shardingnode shardingValue" + values.toString() + ",due to " + e.getMessage());
throw new SQLNonTransientException("can't calculate valid shardingnode shardingValue,due to " + e.getMessage());
}
return nodeIndex;
}
private String getValueString(char[] src, Pair<Integer, Integer> range) {
StringBuilder target = new StringBuilder();
int start = range.getKey();
int end = range.getValue();
if (src[start] == '\'' && src[end - 1] == '\'') {
start++;
end--;
}
for (int i = start; i < end; i++) {
target.append(src[i]);
}
return target.toString();
}
public static class InsertQuery {
private final InsertQueryPos insertQueryPos;
private final List<Pair<Integer, Integer>> valuePair;
private final int incrementColumnIndex;
private final long incrementColumnValue;
public InsertQuery(InsertQueryPos insertQueryPos, List<Pair<Integer, Integer>> valuePair, int incrementColumnIndex, long incrementColumnValue) {
this.insertQueryPos = insertQueryPos;
this.valuePair = valuePair;
this.incrementColumnIndex = incrementColumnIndex;
this.incrementColumnValue = incrementColumnValue;
}
public long getIncrementColumnValue() {
return incrementColumnValue;
}
public int getIncrementColumnIndex() {
return incrementColumnIndex;
}
public InsertQueryPos getInsertQueryPos() {
return insertQueryPos;
}
public List<Pair<Integer, Integer>> getValuePair() {
return valuePair;
}
}
}
@@ -21,6 +21,7 @@ public class TableHandler extends DefaultHandler {
@Override
public SQLStatement preHandle(DumpFileContext context, String stmt) throws SQLSyntaxErrorException {
stmt = stmt.replace("\r", "");
SQLStatement sqlStatement = DruidUtil.parseMultiSQL(stmt);
String tableName;
if (sqlStatement instanceof MySqlCreateTableStatement) {
@@ -0,0 +1,18 @@
package com.actiontech.dble.services.manager.dump.parse;
import com.alibaba.druid.sql.parser.Lexer;
class InsertLexer extends Lexer {
InsertLexer(String input) {
super(input);
}
public int getStartPos() {
return startPos;
}
public int getPos() {
return pos;
}
}
@@ -0,0 +1,135 @@
package com.actiontech.dble.services.manager.dump.parse;
import com.actiontech.dble.route.parser.util.Pair;
import com.alibaba.druid.sql.parser.Token;
import java.util.ArrayList;
import java.util.List;
public class InsertParser {
protected final InsertLexer lexer;
public InsertParser(String sql) {
lexer = new InsertLexer(sql);
lexer.nextToken();
}
public int findInsert() {
while (true) {
if (lexer.token() == Token.SEMI) {
lexer.nextToken();
if (lexer.token() == Token.INSERT || lexer.token() == Token.REPLACE) {
return lexer.getStartPos();
}
lexer.nextToken();
} else if (lexer.token() == Token.EOF) {
return -1;
} else {
lexer.nextToken();
}
}
}
// return null means not start with insert/replace
// QueryRang null means insert not finish
public InsertQueryPos parseStatement() {
InsertQueryPos insertQueryPos = null;
if (lexer.token() == Token.INSERT || lexer.token() == Token.REPLACE) {
int start = lexer.getStartPos();
insertQueryPos = new InsertQueryPos();
if (lexer.token() == Token.REPLACE) {
insertQueryPos.setReplace(true);
}
lexer.nextToken();
if (lexer.stringVal().equalsIgnoreCase("IGNORE")) {
insertQueryPos.setIgnore(true);
lexer.nextToken();
}
if (lexer.token() == Token.INTO) {
lexer.nextToken();
String tableName = lexer.stringVal();
insertQueryPos.setTableName(tableName);
lexer.nextToken();
}
if (lexer.token() == Token.LPAREN) {
lexer.nextToken();
int colStart = lexer.getStartPos();
this.parseColumnNameList(insertQueryPos);
int colEnd = lexer.getStartPos();
Pair<Integer, Integer> columnRange = new Pair<>(colStart, colEnd);
insertQueryPos.setColumnRange(columnRange);
this.accept(Token.RPAREN);
}
if (lexer.token() == Token.VALUES) {
lexer.nextToken();
while (true) {
if (lexer.token() == Token.LPAREN) {
lexer.nextToken();
int valueStart = lexer.getStartPos();
List<Pair<Integer, Integer>> valuerItemList = new ArrayList<>();
this.parseValueList(valuerItemList);
insertQueryPos.getValueItemsRange().add(valuerItemList);
int valueEnd = lexer.getStartPos();
Pair<Integer, Integer> valueRange = new Pair<>(valueStart, valueEnd);
insertQueryPos.getValuesRange().add(valueRange);
this.accept(Token.RPAREN);
}
if (lexer.token() != Token.COMMA) {
break;
}
lexer.nextToken();
}
}
if (accept(Token.SEMI)) {
int end = lexer.getStartPos();
insertQueryPos.setQueryRange(new Pair<>(start, end));
}
}
return insertQueryPos;
}
private boolean accept(Token token) {
if (lexer.token() == token) {
lexer.nextToken();
return true;
}
return false;
}
private void parseColumnNameList(InsertQueryPos insertQueryPos) {
int i = 0;
while (true) {
String colName = lexer.token().name;
insertQueryPos.getColNameIndexMap().put(colName, i);
insertQueryPos.getColumns().add(colName);
lexer.nextToken();
if (lexer.token() != Token.COMMA) {
return;
}
lexer.nextToken();
i++;
}
}
private void parseValueList(List<Pair<Integer, Integer>> valuerItemList) {
while (true) {
int valueItemStart = lexer.getStartPos();
int valueItemEnd = lexer.getPos();
Pair<Integer, Integer> valueRange = new Pair<>(valueItemStart, valueItemEnd);
valuerItemList.add(valueRange);
lexer.nextToken();
if (lexer.token() != Token.COMMA) {
return;
}
lexer.nextTokenValue();
}
}
}
@@ -0,0 +1,104 @@
package com.actiontech.dble.services.manager.dump.parse;
import com.actiontech.dble.route.parser.util.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class InsertQueryPos {
public boolean isIgnore() {
return isIgnore;
}
public void setIgnore(boolean ignore) {
isIgnore = ignore;
}
public boolean isReplace() {
return isReplace;
}
public void setReplace(boolean replace) {
isReplace = replace;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public Map<String, Integer> getColNameIndexMap() {
return colNameIndexMap;
}
public List<String> getColumns() {
return columns;
}
public Pair<Integer, Integer> getColumnRange() {
return columnRange;
}
public void setColumnRange(Pair<Integer, Integer> columnRange) {
this.columnRange = columnRange;
}
public List<Pair<Integer, Integer>> getValuesRange() {
return valuesRange;
}
public List<List<Pair<Integer, Integer>>> getValueItemsRange() {
return valueItemsRange;
}
public Pair<Integer, Integer> getQueryRange() {
return queryRange;
}
public void setQueryRange(Pair<Integer, Integer> queryRange) {
this.queryRange = queryRange;
}
public char[] getInsertChars() {
if (this.insertChars == null) {
this.insertChars = this.insertString.toCharArray();
}
return insertChars;
}
public void setInsertString(String insertString) {
this.insertString = insertString;
}
public void clear() {
insertChars = null;
insertString = null;
queryRange = null;
valuesRange.clear();
valueItemsRange.clear();
columnRange = null;
colNameIndexMap.clear();
columns.clear();
}
private String insertString;
private char[] insertChars;
private Pair<Integer, Integer> queryRange;
private final List<Pair<Integer, Integer>> valuesRange = new ArrayList<>();
private final List<List<Pair<Integer, Integer>>> valueItemsRange = new ArrayList<>();
private Pair<Integer, Integer> columnRange;
private boolean isIgnore = false;
private boolean isReplace = false;
private String tableName;
private final Map<String, Integer> colNameIndexMap = new HashMap<>();
private final List<String> columns = new ArrayList<>();
}
@@ -12,21 +12,25 @@ import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.ExecutorUtil;
import com.actiontech.dble.util.NameableExecutor;
import com.actiontech.dble.util.StringUtil;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public final class SplitDumpHandler {
private static final Pattern SPLIT_STMT = Pattern.compile("([^\\s]+)\\s+([^\\s]+)(((\\s*(-s([^\\s]+))?)|(\\s+(-r(\\d+))?)|(\\s+(-w(\\d+))?)|(\\s+(-l(\\d+))?)|(\\s+(--ignore)?)|(\\s+(-t(\\d+))?))+)", Pattern.CASE_INSENSITIVE);
private static final Pattern SPLIT_STMT = Pattern.compile("([^\\s]+)\\s+([^\\s]+)(((\\s*(-s([^\\s]+))?)|(\\s+(-r(\\d+))?)|(\\s+(-w(\\d+))?)|(\\s+(-l(\\d+))?)|(\\s+(--ignore)?)|(\\s+(-t(\\d+))?)|(\\s+(-b(\\d+))?))+)", Pattern.CASE_INSENSITIVE);
public static final Logger LOGGER = LoggerFactory.getLogger("dumpFileLog");
private SplitDumpHandler() {
}
@@ -50,23 +54,35 @@ public final class SplitDumpHandler {
}
}
DumpFileWriter writer = new DumpFileWriter();
ArrayBlockingQueue<String> deque = new ArrayBlockingQueue<>(config.getReadQueueSize());
ArrayBlockingQueue<String> insertDeque = new ArrayBlockingQueue<>(config.getReadQueueSize());
DumpFileReader reader = new DumpFileReader(deque, insertDeque);
NameableExecutor nameableExecutor = ExecutorUtil.createFixed("dump-file-executor", config.getThreadNum());
DumpFileExecutor dumpFileExecutor = new DumpFileExecutor(deque, insertDeque, writer, config, defaultSchemaConfig, nameableExecutor);
BlockingQueue<String> deque = new ArrayBlockingQueue<>(config.getReadQueueSize());
BlockingQueue<String> insertDeque = new ArrayBlockingQueue<>(config.getReadQueueSize());
BlockingQueue<String> handleQueue = new ArrayBlockingQueue<>(config.getReadQueueSize());
NameableExecutor fileExecutor = ExecutorUtil.createFixed("Split_Executor", config.getThreadNum());
NameableExecutor fileHandlerExecutor = ExecutorUtil.createFixed("Split_Handler", 1);
NameableExecutor fileReadExecutor = ExecutorUtil.createFixed("Split_Reader", 1);
Map<String, String> errorMap = Maps.newConcurrentMap();
AtomicBoolean errorFlag = new AtomicBoolean(false);
//thread
DumpFileReader reader = new DumpFileReader(handleQueue, errorMap, errorFlag);
DumpFileHandler fileHandler = new DumpFileHandler(deque, insertDeque, handleQueue, fileExecutor, errorMap, errorFlag);
DumpFileWriter writer = new DumpFileWriter(errorMap);
DumpFileExecutor dumpFileExecutor = new DumpFileExecutor(deque, insertDeque, writer, config, defaultSchemaConfig, fileExecutor, errorFlag);
try {
// thread for process statement
nameableExecutor.execute(dumpFileExecutor);
reader.open(config.getReadFile(), config);
writer.open(config.getWritePath() + FileUtils.getName(config.getReadFile()), config.getWriteQueueSize(), config.getMaxValues());
// start write
writer.start();
// thread for process statement
fileExecutor.execute(dumpFileExecutor);
// firstly check file
reader.open(config.getReadFile());
fileHandlerExecutor.execute(fileHandler);
// start read
reader.start(service, nameableExecutor);
} catch (IOException | InterruptedException e) {
fileReadExecutor.execute(reader);
} catch (IOException e) {
LOGGER.info("finish to split dump file because " + e.getMessage());
service.writeErrMessage(ErrorCode.ER_IO_EXCEPTION, e.getMessage());
return;
@@ -74,19 +90,23 @@ public final class SplitDumpHandler {
List<ErrorMsg> errors = dumpFileExecutor.getContext().getErrors();
if (!CollectionUtil.isEmpty(errors)) {
recycleThread(fileReadExecutor, fileHandlerExecutor, fileExecutor, writer, true, errorMap);
DumpFileError.execute(service, errors);
return;
}
while (!service.getConnection().isClosed() && !writer.isFinished()) {
while (!service.getConnection().isClosed() && !writer.isFinished() && !errorFlag.get()) {
LockSupport.parkNanos(1000);
}
nameableExecutor.shutdown();
//recycling thread
recycleThread(fileReadExecutor, fileHandlerExecutor, fileExecutor, writer, errorFlag.get(), errorMap);
errorMap.forEach((key, value) -> errors.add(new ErrorMsg(key, value)));
if (service.getConnection().isClosed()) {
LOGGER.info("finish to split dump file because the connection is closed.");
nameableExecutor.shutdownNow();
writer.stop();
//recycling thread
recycleThread(fileReadExecutor, fileHandlerExecutor, fileExecutor, writer, errorFlag.get(), errorMap);
service.writeErrMessage(ErrorCode.ER_IO_EXCEPTION, "finish to split dump file due to the connection is closed.");
return;
}
@@ -95,6 +115,21 @@ public final class SplitDumpHandler {
LOGGER.info("finish to split dump file.");
}
private static void recycleThread(NameableExecutor fileReadExecutor, NameableExecutor fileHandlerExecutor, NameableExecutor fileExecutor,
DumpFileWriter writer, boolean errorFlag, Map<String, String> errorMap) {
//recycling thread
fileReadExecutor.shutdownNow();
fileHandlerExecutor.shutdownNow();
fileExecutor.shutdownNow();
try {
writer.stop(errorFlag);
} catch (IOException e) {
LOGGER.warn("error,because:", e);
errorMap.putIfAbsent("error", "dump file error,because:" + e.getMessage());
}
}
private static void printMsg(List<ErrorMsg> errors, ManagerService service) {
if (CollectionUtil.isEmpty(errors)) {
OkPacket packet = new OkPacket();
@@ -133,6 +168,9 @@ public final class SplitDumpHandler {
if (m.group(21) != null) {
config.setThreadNum(Integer.parseInt(m.group(21)));
}
if (m.group(24) != null) {
config.setBufferSize(Integer.parseInt(m.group(24)));
}
}
return config;
}
@@ -30,7 +30,7 @@ public final class DumpFileError {
byte packetId = 0;
HEADER.setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("TABLE", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i] = PacketUtil.getField("SUMMARY", Fields.FIELD_TYPE_VAR_STRING);
FIELDS[i++].setPacketId(++packetId);
FIELDS[i] = PacketUtil.getField("DETAIL", Fields.FIELD_TYPE_VAR_STRING);
@@ -0,0 +1,231 @@
package com.actiontech.dble.manager;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.services.manager.dump.parse.InsertParser;
import com.actiontech.dble.services.manager.dump.parse.InsertQueryPos;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class SplitCSVTest {
@Test
public void test1() {
String sql = "replace into table1(col1,column2) values(1,'3'),(2,'5'),(3,';,');";
char[] sqlArray = sql.toCharArray();
InsertParser obj = new InsertParser(sql);
InsertQueryPos pos = obj.parseStatement();
Assert.assertEquals(0, pos.getQueryRange().getKey().intValue());
Assert.assertEquals(sql.length(), pos.getQueryRange().getValue().intValue());
Assert.assertTrue(pos.isReplace());
Assert.assertFalse(pos.isIgnore());
Assert.assertEquals("table1", pos.getTableName());
Assert.assertEquals("col1,column2", getSubString(sqlArray, pos.getColumnRange()));
List<Pair<Integer, Integer>> valueRange = pos.getValuesRange();
// first value
Assert.assertEquals("1,'3'", getSubString(sqlArray, valueRange.get(0)));
// 3nd value
Assert.assertEquals("3,';,'", getSubString(sqlArray, valueRange.get(2)));
// 3nd value ,2th col
Pair<Integer, Integer> valueItemRange = pos.getValueItemsRange().get(2).get(1);
Assert.assertEquals("';,'", getSubString(sqlArray, valueItemRange));
}
@Test
public void test2() {
String sql = "insert ignore into table1(col1,column2) values(1,'3'),(2,'5'),(3,';,');";
char[] sqlArray = sql.toCharArray();
InsertParser obj = new InsertParser(sql);
InsertQueryPos pos = obj.parseStatement();
Assert.assertEquals(0, pos.getQueryRange().getKey().intValue());
Assert.assertEquals(sql.length(), pos.getQueryRange().getValue().intValue());
Assert.assertFalse(pos.isReplace());
Assert.assertTrue(pos.isIgnore());
Assert.assertEquals("table1", pos.getTableName());
Assert.assertEquals("col1,column2", getSubString(sqlArray, pos.getColumnRange()));
List<Pair<Integer, Integer>> valueRange = pos.getValuesRange();
// first value
Assert.assertEquals("1,'3'", getSubString(sqlArray, valueRange.get(0)));
// 3nd value
Assert.assertEquals("3,';,'", getSubString(sqlArray, valueRange.get(2)));
// 3nd value ,2th col
Pair<Integer, Integer> valueItemRange = pos.getValueItemsRange().get(2).get(1);
Assert.assertEquals("';,'", getSubString(sqlArray, valueItemRange));
}
@Test
public void test3() {
String sql = "insert into table1 values(1,'3'),(2,'5'),(3,';,');";
char[] sqlArray = sql.toCharArray();
InsertParser obj = new InsertParser(sql);
InsertQueryPos pos = obj.parseStatement();
Assert.assertEquals(0, pos.getQueryRange().getKey().intValue());
Assert.assertEquals(sql.length(), pos.getQueryRange().getValue().intValue());
Assert.assertFalse(pos.isReplace());
Assert.assertFalse(pos.isIgnore());
Assert.assertEquals("table1", pos.getTableName());
List<Pair<Integer, Integer>> valueRange = pos.getValuesRange();
// first value
Assert.assertEquals("1,'3'", getSubString(sqlArray, valueRange.get(0)));
// 3nd value
Assert.assertEquals("3,';,'", getSubString(sqlArray, valueRange.get(2)));
// 3nd value ,2th col
Pair<Integer, Integer> valueItemRange = pos.getValueItemsRange().get(2).get(1);
Assert.assertEquals("';,'", getSubString(sqlArray, valueItemRange));
Assert.assertEquals(";,", getValueString(sqlArray, valueItemRange));
Pair<Integer, Integer> valueItemRange1 = pos.getValueItemsRange().get(2).get(0);
Assert.assertEquals("3", getValueString(sqlArray, valueItemRange1));
}
@Test
public void test4() {
Assert.assertNotNull(getInsertQueryPos("insert ignore into table1(col1,column2) values(1,'3'),(2,'5'),(3,';,');").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col1,column2) values(1,'3'),(2,'5'),(3,';,')").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col1,column2) values(1,'3'),(2,'5'),(3,';,'").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col1,column2) values(1,'3'),(2,'5'),(3,';").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col1,column2) values(1,").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col1,column2) values").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col1,column2) val").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col1,column2)").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col1,").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table1(col").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into table").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore into ").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore in ").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ignore ").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert ign").getQueryRange());
Assert.assertNull(getInsertQueryPos("insert").getQueryRange());
Assert.assertNull(getInsertQueryPos("inser"));
}
@Test
public void test5() {
String s1 = "insert into table1 values(4,'z'),(5,'y'),(6,'x');\n";
String s2 = "insert into table1 values(1,'3'),(2,'5'),(3,';,');";
String sql = s1 + s2;
char[] sqlArray = sql.toCharArray();
InsertParser obj = new InsertParser(sql);
InsertQueryPos pos1 = obj.parseStatement(); // first time
InsertQueryPos pos = obj.parseStatement(); //second time
Assert.assertEquals(s1.length(), pos.getQueryRange().getKey().intValue());
Assert.assertEquals(sql.length(), pos.getQueryRange().getValue().intValue());
Assert.assertFalse(pos.isReplace());
Assert.assertFalse(pos.isIgnore());
Assert.assertEquals("table1", pos.getTableName());
List<Pair<Integer, Integer>> valueRange = pos.getValuesRange();
// first value
Assert.assertEquals("1,'3'", getSubString(sqlArray, valueRange.get(0)));
// 3nd value
Assert.assertEquals("3,';,'", getSubString(sqlArray, valueRange.get(2)));
// 3nd value ,2th col
Pair<Integer, Integer> valueItemRange = pos.getValueItemsRange().get(2).get(1);
Assert.assertEquals("';,'", getSubString(sqlArray, valueItemRange));
Assert.assertEquals(";,", getValueString(sqlArray, valueItemRange));
Pair<Integer, Integer> valueItemRange1 = pos.getValueItemsRange().get(2).get(0);
Assert.assertEquals("3", getValueString(sqlArray, valueItemRange1));
}
@Test
public void test6() {
String s11 = ");\ninsert into table1 values(4,'z'),(5,'y'),(6,'x');\n";
String s12 = "insert into table1 values(7,'z7'),(8,'8'),(9,'9');\n";
String s13 = "insert into table1 value";
String s21 = "s(1,'3'),(2,'5'),(3,';,');\n";
String s22 = "insert into table1 values(10,'10'),(11,'11'),(12,'12');\n";
String s23 = "insert into table1 values(13,'13'),(14";
String s1 = s11 + s12 + s13;
String s2 = s21 + s22 + s23;
char[] sqlArray1 = s1.toCharArray();
char[] sqlArray2 = s2.toCharArray();
InsertParser obj1 = new InsertParser(s1);
obj1.findInsert();
InsertQueryPos tmpPos = obj1.parseStatement();
int lastEndPos = 0;
// not insert query or insert not finish
while (tmpPos != null && tmpPos.getQueryRange() != null) {
//do something about tmpPos
lastEndPos = tmpPos.getQueryRange().getValue();
tmpPos = obj1.parseStatement();
}
InsertParser obj2 = new InsertParser(s2);
// may need loop
int nextStartPos = obj2.findInsert();
// cat last end and next start
char[] sqlArray = genCatChars(sqlArray1, sqlArray2, lastEndPos, nextStartPos);
String catQuery = new String(sqlArray);
InsertParser obj = new InsertParser(catQuery);
InsertQueryPos pos = obj.parseStatement();
// then do obj2 ... obj2.parseStatement();
Assert.assertFalse(pos.isReplace());
Assert.assertFalse(pos.isIgnore());
Assert.assertEquals("table1", pos.getTableName());
List<Pair<Integer, Integer>> valueRange = pos.getValuesRange();
// first value
Assert.assertEquals("1,'3'", getSubString(sqlArray, valueRange.get(0)));
// 3nd value
Assert.assertEquals("3,';,'", getSubString(sqlArray, valueRange.get(2)));
// 3nd value ,2th col
Pair<Integer, Integer> valueItemRange = pos.getValueItemsRange().get(2).get(1);
Assert.assertEquals("';,'", getSubString(sqlArray, valueItemRange));
Assert.assertEquals(";,", getValueString(sqlArray, valueItemRange));
Pair<Integer, Integer> valueItemRange1 = pos.getValueItemsRange().get(2).get(0);
Assert.assertEquals("3", getValueString(sqlArray, valueItemRange1));
}
private InsertQueryPos getInsertQueryPos(String sql) {
InsertParser obj = new InsertParser(sql);
return obj.parseStatement();
}
private String getSubString(char[] src, Pair<Integer, Integer> range) {
StringBuilder target = new StringBuilder();
addSubString(src, range, target);
return target.toString();
}
//utils
private char[] genCatChars(char[] lastChar, char[] nextChar, int lastEndPos, int nextStartPos) {
char[] sqlChars = new char[lastChar.length - lastEndPos + nextStartPos];
System.arraycopy(lastChar, lastEndPos, sqlChars, 0, lastChar.length - lastEndPos);
System.arraycopy(nextChar, 0, sqlChars, lastChar.length - lastEndPos, nextStartPos);
return sqlChars;
}
private String getValueString(char[] src, Pair<Integer, Integer> range) {
StringBuilder target = new StringBuilder();
int start = range.getKey();
int end = range.getValue();
if (src[start] == '\'' && src[end - 1] == '\'') {
start++;
end--;
}
for (int i = start; i < end; i++) {
target.append(src[i]);
}
return target.toString();
}
private void addSubString(char[] src, Pair<Integer, Integer> range, StringBuilder target) {
for (int i = range.getKey(); i < range.getValue(); i++) {
target.append(src[i]);
}
}
}