提交 95b26c4d 编写于 作者: A agapple

fixed #1433, optimzier code

上级 b114e0cb
......@@ -107,8 +107,9 @@ public class RdbSyncService {
int j = i;
futures.add(executorThreads[i].submit(() -> {
try {
dmlsPartition[j]
.forEach(syncItem -> sync(batchExecutors[j], syncItem.config, syncItem.singleDml));
dmlsPartition[j].forEach(syncItem -> sync(batchExecutors[j],
syncItem.config,
syncItem.singleDml));
dmlsPartition[j].clear();
batchExecutors[j].commit();
return true;
......@@ -146,49 +147,49 @@ public class RdbSyncService {
sync(dmls, dml -> {
if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
// DDL
columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
return false;
columnsTypeCache.remove(dml.getDestination() + "." + dml.getDatabase() + "." + dml.getTable());
return false;
} else {
// DML
String destination = StringUtils.trimToEmpty(dml.getDestination());
String groupId = StringUtils.trimToEmpty(dml.getGroupId());
String database = dml.getDatabase();
String table = dml.getTable();
Map<String, MappingConfig> configMap;
if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
} else {
// DML
String destination = StringUtils.trimToEmpty(dml.getDestination());
String groupId = StringUtils.trimToEmpty(dml.getGroupId());
String database = dml.getDatabase();
String table = dml.getTable();
Map<String, MappingConfig> configMap;
if (envProperties != null && !"tcp".equalsIgnoreCase(envProperties.getProperty("canal.conf.mode"))) {
configMap = mappingConfig.get(destination + "-" + groupId + "_" + database + "-" + table);
} else {
configMap = mappingConfig.get(destination + "_" + database + "-" + table);
}
configMap = mappingConfig.get(destination + "_" + database + "-" + table);
}
if (configMap == null) {
return false;
}
if (configMap == null) {
return false;
}
if (configMap.values().isEmpty()) {
return false;
}
if (configMap.values().isEmpty()) {
return false;
}
for (MappingConfig config : configMap.values()) {
if (config.getConcurrent()) {
List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
singleDmls.forEach(singleDml -> {
int hash = pkHash(config.getDbMapping(), singleDml.getData());
SyncItem syncItem = new SyncItem(config, singleDml);
dmlsPartition[hash].add(syncItem);
});
} else {
int hash = 0;
List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
singleDmls.forEach(singleDml -> {
SyncItem syncItem = new SyncItem(config, singleDml);
dmlsPartition[hash].add(syncItem);
});
}
for (MappingConfig config : configMap.values()) {
if (config.getConcurrent()) {
List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
singleDmls.forEach(singleDml -> {
int hash = pkHash(config.getDbMapping(), singleDml.getData());
SyncItem syncItem = new SyncItem(config, singleDml);
dmlsPartition[hash].add(syncItem);
});
} else {
int hash = 0;
List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml);
singleDmls.forEach(singleDml -> {
SyncItem syncItem = new SyncItem(config, singleDml);
dmlsPartition[hash].add(syncItem);
});
}
return true;
}
});
return true;
}
} );
}
/**
......@@ -314,7 +315,7 @@ public class RdbSyncService {
for (String srcColumnName : old.keySet()) {
List<String> targetColumnNames = new ArrayList<>();
columnsMap.forEach((targetColumn, srcColumn) -> {
if (srcColumnName.toLowerCase().equals(srcColumn.toLowerCase())) {
if (srcColumnName.equalsIgnoreCase(srcColumn)) {
targetColumnNames.add(targetColumn);
}
});
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册