未验证 提交 3b618740 编写于 作者: A agapple 提交者: GitHub

Merge pull request #1607 from rewerma/master

......@@ -84,7 +84,7 @@ public class YmlConfigBinder {
try {
byte[] contentBytes;
if (charset == null) {
contentBytes = content.getBytes();
contentBytes = content.getBytes("UTF-8");
} else {
contentBytes = content.getBytes(charset);
......@@ -89,7 +89,7 @@ class RelaxedConversionService implements ConversionService {
return new RelaxedConversionService.StringToEnumIgnoringCaseConverterFactory.StringToEnum(enumType);
private class StringToEnum<T extends Enum> implements Converter<String, T> {
private static class StringToEnum<T extends Enum> implements Converter<String, T> {
private final Class<T> enumType;
......@@ -191,6 +191,7 @@ public abstract class YamlProcessor {
private boolean process(Map<String, Object> map, MatchCallback callback) {
Properties properties = new Properties() {
......@@ -6,6 +6,7 @@ import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Enumeration;
......@@ -27,7 +28,8 @@ import org.slf4j.LoggerFactory;
public class ExtensionLoader<T> {
private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);
private static final Logger logger = LoggerFactory
private static final String SERVICES_DIRECTORY = "META-INF/services/";
......@@ -35,7 +37,8 @@ public class ExtensionLoader<T> {
private static final String DEFAULT_CLASSLOADER_POLICY = "internal";
private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
private static final Pattern NAME_SEPARATOR = Pattern
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();
......@@ -171,7 +174,8 @@ public class ExtensionLoader<T> {
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+ ") could not be instantiated: " + t.getMessage(), t);
+ ") could not be instantiated: " + t.getMessage(),
......@@ -191,7 +195,8 @@ public class ExtensionLoader<T> {
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+ ") could not be instantiated: " + t.getMessage(), t);
+ ") could not be instantiated: " + t.getMessage(),
......@@ -233,7 +238,11 @@ public class ExtensionLoader<T> {
// button.
jarPath = jarPath.replaceAll("/classes/.*", "/classes/");
return Paths.get(jarPath).getParent().toString(); // Paths - from java 8
Path path = Paths.get(jarPath).getParent(); // Paths - from java 8
if (path != null) {
return path.toString();
return null;
private Map<String, Class<?>> loadExtensionClasses() {
......@@ -330,12 +339,10 @@ public class ExtensionLoader<T> {
// Class.forName(line, true,
// classLoader);
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: "
+ type
+ ", class line: "
+ clazz.getName()
+ "), class "
+ clazz.getName()
throw new IllegalStateException(
"Error when load extension class(interface: " + type
+ ", class line: " + clazz.getName()
+ "), class " + clazz.getName()
+ "is not subtype of interface.");
} else {
try {
......@@ -353,9 +360,9 @@ public class ExtensionLoader<T> {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension "
+ type.getName()
+ " name " + n + " on "
throw new IllegalStateException(
"Duplicate extension " + type.getName() + " name "
+ n + " on "
+ c.getName() + " and "
+ clazz.getName());
......@@ -365,12 +372,9 @@ public class ExtensionLoader<T> {
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: "
+ type
+ ", class line: "
+ line
+ ") in "
+ url
IllegalStateException e = new IllegalStateException(
"Failed to load extension class(interface: " + type + ", class line: "
+ line + ") in " + url
+ ", cause: "
+ t.getMessage(),
......@@ -385,13 +389,15 @@ public class ExtensionLoader<T> {
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " + type + ", class file: " + url
+ ") in " + url, t);
+ ") in " + url,
} // end of while urls
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName
+ ").", t);
"Exception when load extension class(interface: " + type + ", description file: " + fileName + ").",
......@@ -155,9 +155,11 @@ public class JdbcTypeUtil {
} else {
res = null;
case Types.CLOB:
res = value;
return res;
} catch (Exception e) {
......@@ -12,10 +12,10 @@ import java.util.Date;
public class Result implements Serializable {
private static final long serialVersionUID = -3276409502352405716L;
public Integer code = 20000;
public Object data;
public String message;
public Date sysTime;
private Integer code = 20000;
private Object data;
private String message;
private Date sysTime;
public static Result createSuccess(String message) {
Result result = new Result();
......@@ -2,11 +2,17 @@ package com.alibaba.otter.canal.client.adapter.support;
import java.io.File;
import java.net.URL;
import java.sql.*;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.TimeZone;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -29,38 +35,16 @@ public class Util {
* 通过DS执行sql
public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) throws SQLException {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = ds.getConnection();
stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
try (Connection conn = ds.getConnection();
Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);) {
rs = stmt.executeQuery(sql);
return fun.apply(rs);
} finally {
if (rs != null) {
try {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
if (stmt != null) {
try {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
if (conn != null) {
try {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
try (ResultSet rs = stmt.executeQuery(sql);) {
return fun.apply(rs);
} catch (Exception e) {
logger.error("sqlRs has error, sql: {} ", sql);
throw new RuntimeException(e);
......@@ -72,29 +56,10 @@ public class Util {
* @param consumer 回调方法
public static void sqlRS(Connection conn, String sql, Consumer<ResultSet> consumer) {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql)) {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
} finally {
if (rs != null) {
try {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
if (stmt != null) {
try {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
......@@ -144,7 +109,41 @@ public class Util {
return column;
public static String timeZone; // 当前时区
public static ThreadPoolExecutor newFixedThreadPool(int nThreads, long keepAliveTime) {
return new ThreadPoolExecutor(nThreads,
new SynchronousQueue<>(),
(r, exe) -> {
if (!exe.isShutdown()) {
try {
} catch (InterruptedException e) {
// ignore
public static ThreadPoolExecutor newSingleThreadExecutor(long keepAliveTime) {
return new ThreadPoolExecutor(1,
new SynchronousQueue<>(),
(r, exe) -> {
if (!exe.isShutdown()) {
try {
} catch (InterruptedException e) {
// ignore
public final static String timeZone; // 当前时区
private static DateTimeZone dateTimeZone;
static {
......@@ -265,7 +264,7 @@ public class Util {
LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
} catch (Exception e) {
} catch (Throwable e) {
logger.error(e.getMessage(), e);
......@@ -40,6 +40,12 @@
......@@ -133,8 +133,8 @@ public class ESAdapter implements OuterAdapter {
esSyncService = new ESSyncService(esTemplate);
esConfigMonitor = new ESConfigMonitor();
} catch (Exception e) {
esConfigMonitor.init(this, envProperties);
} catch (Throwable e) {
throw new RuntimeException(e);
......@@ -31,7 +31,7 @@ public class ESSyncConfig {
throw new NullPointerException("esMapping._type");
if (esMapping._id == null && esMapping.getPk() == null) {
throw new NullPointerException("esMapping._id and esMapping.pk");
throw new NullPointerException("esMapping._id or esMapping.pk");
if (esMapping.sql == null) {
throw new NullPointerException("esMapping.sql");
......@@ -80,23 +80,23 @@ public class ESSyncConfig {
public static class ESMapping {
private String _index;
private String _type;
private String _id;
private boolean upsert = false;
private String pk;
// private String parent;
private String sql;
private String _index;
private String _type;
private String _id;
private boolean upsert = false;
private String pk;
private Map<String, RelationMapping> relations = new LinkedHashMap<>();
private String sql;
// 对象字段, 例: objFields:
// - _labels: array:;
private Map<String, String> objFields = new LinkedHashMap<>();
private List<String> skips = new ArrayList<>();
private int commitBatch = 1000;
private String etlCondition;
private boolean syncByTimestamp = false; // 是否按时间戳定时同步
private Long syncInterval; // 同步时间间隔
private Map<String, String> objFields = new LinkedHashMap<>();
private List<String> skips = new ArrayList<>();
private int commitBatch = 1000;
private String etlCondition;
private boolean syncByTimestamp = false; // 是否按时间戳定时同步
private Long syncInterval; // 同步时间间隔
private SchemaItem schemaItem; // sql解析结果模型
private SchemaItem schemaItem; // sql解析结果模型
public String get_index() {
return _index;
......@@ -154,6 +154,14 @@ public class ESSyncConfig {
this.skips = skips;
public Map<String, RelationMapping> getRelations() {
return relations;
public void setRelations(Map<String, RelationMapping> relations) {
this.relations = relations;
public String getSql() {
return sql;
......@@ -202,4 +210,26 @@ public class ESSyncConfig {
this.schemaItem = schemaItem;
public static class RelationMapping {
private String name;
private String parent;
public String getName() {
return name;
public void setName(String name) {
this.name = name;
public String getParent() {
return parent;
public void setParent(String parent) {
this.parent = parent;
......@@ -278,12 +278,13 @@ public class SchemaItem {
if (relationSelectFieldItems == null) {
synchronized (SchemaItem.class) {
if (relationSelectFieldItems == null) {
relationSelectFieldItems = new ArrayList<>();
List<FieldItem> relationSelectFieldItemsTmp = new ArrayList<>();
for (FieldItem fieldItem : schemaItem.getSelectFields().values()) {
if (fieldItem.getOwners().contains(getAlias())) {
relationSelectFieldItems = relationSelectFieldItemsTmp;
......@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.es.monitor;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -12,9 +13,9 @@ import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
import com.alibaba.otter.canal.client.adapter.es.ESAdapter;
import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
......@@ -31,10 +32,13 @@ public class ESConfigMonitor {
private ESAdapter esAdapter;
private Properties envProperties;
private FileAlterationMonitor fileMonitor;
public void init(ESAdapter esAdapter) {
public void init(ESAdapter esAdapter, Properties envProperties) {
this.esAdapter = esAdapter;
this.envProperties = envProperties;
File confDir = Util.getConfDirPath(adapterName);
try {
FileAlterationObserver observer = new FileAlterationObserver(confDir,
......@@ -65,11 +69,13 @@ public class ESConfigMonitor {
try {
// 加载新增的配置文件
String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
addConfigToCache(file, config);
logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
ESSyncConfig config = YmlConfigBinder
.bindYmlToObj(null, configContent, ESSyncConfig.class, null, envProperties);
if (config != null) {
addConfigToCache(file, config);
logger.info("Add a new es mapping config: {} to canal adapter", file.getName());
} catch (Exception e) {
logger.error(e.getMessage(), e);
......@@ -88,7 +94,11 @@ public class ESConfigMonitor {
ESSyncConfig config = new Yaml().loadAs(configContent, ESSyncConfig.class);
ESSyncConfig config = YmlConfigBinder
.bindYmlToObj(null, configContent, ESSyncConfig.class, null, envProperties);
if (config == null) {
if (esAdapter.getEsSyncConfig().containsKey(file.getName())) {
package com.alibaba.otter.canal.client.adapter.es.service;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
......@@ -30,10 +28,10 @@ import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
import com.alibaba.otter.canal.client.adapter.es.support.ESSyncUtil;
import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.google.common.base.Joiner;
......@@ -100,7 +98,7 @@ public class ESEtlService {
// 获取总数
String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
long cnt = (Long) ESSyncUtil.sqlRS(dataSource, countSql, rs -> {
long cnt = (Long) Util.sqlRS(dataSource, countSql, rs -> {
Long count = null;
try {
if (rs.next()) {
......@@ -116,8 +114,7 @@ public class ESEtlService {
if (cnt >= 10000) {
int threadCount = 3; // 从配置读取默认为3
long perThreadCnt = cnt / threadCount;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<Future<Boolean>> futures = new ArrayList<>(threadCount);
ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
for (int i = 0; i < threadCount; i++) {
long offset = i * perThreadCnt;
Long size = null;
......@@ -130,16 +127,13 @@ public class ESEtlService {
} else {
sqlFinal = sql + " LIMIT " + offset + "," + cnt;
Future<Boolean> future = executor
.submit(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg));
for (Future<Boolean> future : futures) {
executor.execute(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg));
while (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
// ignore
} else {
executeSqlImport(dataSource, sql, mapping, impCount, errMsg);
......@@ -176,7 +170,7 @@ public class ESEtlService {
private boolean executeSqlImport(DataSource ds, String sql, ESMapping mapping, AtomicLong impCount,
List<String> errMsg) {
try {
ESSyncUtil.sqlRS(ds, sql, rs -> {
Util.sqlRS(ds, sql, rs -> {
int count = 0;
try {
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
......@@ -184,39 +178,73 @@ public class ESEtlService {
long batchBegin = System.currentTimeMillis();
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
Object idVal = null;
for (FieldItem fieldItem : mapping.getSchemaItem().getSelectFields().values()) {
// 如果是主键字段则不插入
if (fieldItem.getFieldName().equals(mapping.get_id())) {
String fieldName = fieldItem.getFieldName();
if (mapping.getSkips().contains(fieldName)) {
Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
esFieldData.put(fieldName, val);
// 如果是主键字段则不插入
if (fieldItem.getFieldName().equals(mapping.get_id())) {
idVal = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
} else {
Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
esFieldData.put(Util.cleanColumn(fieldName), val);
Object idVal = null;
if (mapping.get_id() != null) {
idVal = rs.getObject(mapping.get_id());
if (!mapping.getRelations().isEmpty()) {
mapping.getRelations().forEach((relationField, relationMapping) -> {
Map<String, Object> relations = new HashMap<>();
relations.put("name", relationMapping.getName());
if (StringUtils.isNotEmpty(relationMapping.getParent())) {
FieldItem parentFieldItem = mapping.getSchemaItem()
Object parentVal;
try {
parentVal = esTemplate.getValFromRS(mapping,
} catch (SQLException e) {
throw new RuntimeException(e);
if (parentVal != null) {
relations.put("parent", parentVal.toString());
esFieldData.put("$parent_routing", parentVal.toString());
esFieldData.put(Util.cleanColumn(relationField), relations);
if (idVal != null) {
String parentVal = (String) esFieldData.remove("$parent_routing");
if (mapping.isUpsert()) {
UpdateRequestBuilder updateRequestBuilder = transportClient
.prepareUpdate(mapping.get_index(), mapping.get_type(), idVal.toString())
if (StringUtils.isNotEmpty(parentVal)) {
} else {
IndexRequestBuilder indexRequestBuilder = transportClient
.prepareIndex(mapping.get_index(), mapping.get_type(), idVal.toString())
if (StringUtils.isNotEmpty(parentVal)) {
} else {
idVal = rs.getObject(mapping.getPk());
idVal = esFieldData.get(mapping.getPk());
SearchResponse response = transportClient.prepareSearch(mapping.get_index())
.setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
......@@ -7,6 +7,7 @@ import java.util.Map;
import javax.sql.DataSource;
import com.alibaba.otter.canal.client.adapter.support.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -43,7 +44,7 @@ public class ESSyncService {
long begin = System.currentTimeMillis();
if (esSyncConfigs != null) {
if (logger.isTraceEnabled()) {
logger.trace("Destination: {}, database:{}, table:{}, type:{}, effect index count: {}",
logger.trace("Destination: {}, database:{}, table:{}, type:{}, affected index count: {}",
......@@ -65,7 +66,7 @@ public class ESSyncService {
if (logger.isTraceEnabled()) {
logger.trace("Sync elapsed time: {} ms, effect index count:{}, destination: {}",
logger.trace("Sync elapsed time: {} ms, affected indexes count:{}, destination: {}",
(System.currentTimeMillis() - begin),
......@@ -74,7 +75,7 @@ public class ESSyncService {
StringBuilder configIndexes = new StringBuilder();
.forEach(esSyncConfig -> configIndexes.append(esSyncConfig.getEsMapping().get_index()).append(" "));
logger.debug("DML: {} \nEffect indexes: {}",
logger.debug("DML: {} \nAffected indexes: {}",
JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue),
......@@ -166,7 +167,7 @@ public class ESSyncService {
esFieldData.put(fieldItem.getFieldName(), value);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
......@@ -295,7 +296,7 @@ public class ESSyncService {
esFieldData.put(fieldItem.getFieldName(), value);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
......@@ -407,7 +408,7 @@ public class ESSyncService {
// ------关联表简单字段更新为null------
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
esFieldData.put(fieldItem.getFieldName(), null);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), null);
joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
} else {
......@@ -464,7 +465,7 @@ public class ESSyncService {
sql.replace("\n", " "));
ESSyncUtil.sqlRS(ds, sql, rs -> {
Util.sqlRS(ds, sql, rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
......@@ -500,7 +501,7 @@ public class ESSyncService {
sql.replace("\n", " "));
ESSyncUtil.sqlRS(ds, sql, rs -> {
Util.sqlRS(ds, sql, rs -> {
try {
Map<String, Object> esFieldData = null;
if (mapping.getPk() != null) {
......@@ -508,7 +509,7 @@ public class ESSyncService {
esTemplate.getESDataFromDmlData(mapping, data, esFieldData);
for (String key : esFieldData.keySet()) {
esFieldData.put(key, null);
esFieldData.put(Util.cleanColumn(key), null);
while (rs.next()) {
......@@ -601,7 +602,7 @@ public class ESSyncService {
sql.toString().replace("\n", " "));
ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
Util.sqlRS(ds, sql.toString(), rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
......@@ -617,7 +618,7 @@ public class ESSyncService {
esFieldData.put(fieldItem.getFieldName(), val);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
break out;
......@@ -628,7 +629,7 @@ public class ESSyncService {
esFieldData.put(fieldItem.getFieldName(), val);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
......@@ -693,7 +694,7 @@ public class ESSyncService {
sql.toString().replace("\n", " "));
ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
Util.sqlRS(ds, sql.toString(), rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
......@@ -724,7 +725,7 @@ public class ESSyncService {
esFieldData.put(fieldItem.getFieldName(), val);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
......@@ -733,7 +734,7 @@ public class ESSyncService {
} else {
Object val = esTemplate
.getValFromRS(mapping, rs, fieldItem.getFieldName(), fieldItem.getFieldName());
esFieldData.put(fieldItem.getFieldName(), val);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), val);
......@@ -812,7 +813,7 @@ public class ESSyncService {
sql.replace("\n", " "));
ESSyncUtil.sqlRS(ds, sql, rs -> {
Util.sqlRS(ds, sql, rs -> {
try {
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
......@@ -3,12 +3,9 @@ package com.alibaba.otter.canal.client.adapter.es.support;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.sql.Blob;
import java.sql.SQLException;
import java.util.*;
import java.util.Date;
import java.util.function.Function;
import javax.sql.DataSource;
import org.apache.commons.codec.binary.Base64;
import org.joda.time.DateTime;
......@@ -234,8 +231,11 @@ public class ESSyncUtil {
private static byte[] blobToBytes(Blob blob) {
try (InputStream is = blob.getBinaryStream()) {
byte[] b = new byte[(int) blob.length()];
return b;
if (is.read(b) != -1) {
return b;
} else {
return new byte[0];
} catch (IOException | SQLException e) {
return null;
......@@ -297,46 +297,4 @@ public class ESSyncUtil {
sql.append(owner).append(".").append(columnName).append("=").append(value).append(" AND ");
* 执行查询sql
public static Object sqlRS(DataSource ds, String sql, Function<ResultSet, Object> fun) {
Connection conn = null;
Statement smt = null;
ResultSet rs = null;
try {
conn = ds.getConnection();
smt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
rs = smt.executeQuery(sql);
return fun.apply(rs);
} catch (SQLException e) {
logger.error("sqlRs has error, sql: {} ", sql);
throw new RuntimeException(e);
} finally {
if (rs != null) {
try {
} catch (SQLException e) {
logger.error("error to close result set");
if (smt != null) {
try {
} catch (SQLException e) {
logger.error("error to close statement");
if (conn != null) {
try {
} catch (SQLException e) {
logger.error("error to close db connection");
......@@ -2,6 +2,7 @@ package com.alibaba.otter.canal.client.adapter.es.support;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -9,10 +10,13 @@ import java.util.concurrent.ConcurrentMap;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
......@@ -29,6 +33,7 @@ import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem;
import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.ColumnItem;
import com.alibaba.otter.canal.client.adapter.es.config.SchemaItem.FieldItem;
import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
import com.alibaba.otter.canal.client.adapter.support.Util;
* ES 操作模板
......@@ -64,13 +69,24 @@ public class ESTemplate {
public void insert(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
if (mapping.get_id() != null) {
String parentVal = (String) esFieldData.remove("$parent_routing");
if (mapping.isUpsert()) {
getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
UpdateRequestBuilder updateRequestBuilder = transportClient
.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
if (StringUtils.isNotEmpty(parentVal)) {
} else {
getBulk().add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
IndexRequestBuilder indexRequestBuilder = transportClient
.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
if (StringUtils.isNotEmpty(parentVal)) {
} else {
......@@ -96,7 +112,9 @@ public class ESTemplate {
* @param esFieldData 数据Map
public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
append4Update(mapping, pkVal, esFieldData);
Map<String, Object> esFieldDataTmp = new LinkedHashMap<>(esFieldData.size());
esFieldData.forEach((k, v) -> esFieldDataTmp.put(Util.cleanColumn(k), v));
append4Update(mapping, pkVal, esFieldDataTmp);
......@@ -122,7 +140,7 @@ public class ESTemplate {
(fieldName, value) -> sql.append("_v.").append(fieldName).append("=").append(value).append(" AND "));
int len = sql.length();
sql.delete(len - 4, len);
Integer syncCount = (Integer) ESSyncUtil.sqlRS(ds, sql.toString(), rs -> {
Integer syncCount = (Integer) Util.sqlRS(ds, sql.toString(), rs -> {
int count = 0;
try {
while (rs.next()) {
......@@ -137,7 +155,7 @@ public class ESTemplate {
return count;
if (logger.isTraceEnabled()) {
logger.trace("Update ES by query effect {} records", syncCount);
logger.trace("Update ES by query affected {} records", syncCount);
......@@ -200,13 +218,24 @@ public class ESTemplate {
private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
if (mapping.get_id() != null) {
String parentVal = (String) esFieldData.remove("$parent_routing");
if (mapping.isUpsert()) {
getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
UpdateRequestBuilder updateRequestBuilder = transportClient
.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
if (StringUtils.isNotEmpty(parentVal)) {
} else {
getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
UpdateRequestBuilder updateRequestBuilder = transportClient
.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
if (StringUtils.isNotEmpty(parentVal)) {
} else {
SearchResponse response = transportClient.prepareSearch(mapping.get_index())
......@@ -223,6 +252,8 @@ public class ESTemplate {
public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
String columnName) throws SQLException {
fieldName = Util.cleanColumn(fieldName);
columnName = Util.cleanColumn(columnName);
String esType = getEsType(mapping, fieldName);
Object value = resultSet.getObject(columnName);
......@@ -254,9 +285,13 @@ public class ESTemplate {
if (!fieldItem.getFieldName().equals(mapping.get_id())
&& !mapping.getSkips().contains(fieldItem.getFieldName())) {
esFieldData.put(fieldItem.getFieldName(), value);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
// 添加父子文档关联信息
putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
return resultIdVal;
......@@ -288,12 +323,16 @@ public class ESTemplate {
for (ColumnItem columnItem : fieldItem.getColumnItems()) {
if (dmlOld.containsKey(columnItem.getColumnName())
&& !mapping.getSkips().contains(fieldItem.getFieldName())) {
getValFromRS(mapping, resultSet, fieldItem.getFieldName(), fieldItem.getFieldName()));
// 添加父子文档关联信息
putRelationDataFromRS(mapping, schemaItem, resultSet, esFieldData);
return resultIdVal;
......@@ -337,9 +376,12 @@ public class ESTemplate {
if (!fieldItem.getFieldName().equals(mapping.get_id())
&& !mapping.getSkips().contains(fieldItem.getFieldName())) {
esFieldData.put(fieldItem.getFieldName(), value);
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
// 添加父子文档关联信息
putRelationData(mapping, schemaItem, dmlData, esFieldData);
return resultIdVal;
......@@ -364,13 +406,67 @@ public class ESTemplate {
if (dmlOld.containsKey(columnName) && !mapping.getSkips().contains(fieldItem.getFieldName())) {
getValFromData(mapping, dmlData, fieldItem.getFieldName(), columnName));
// 添加父子文档关联信息
putRelationData(mapping, schemaItem, dmlOld, esFieldData);
return resultIdVal;
private void putRelationDataFromRS(ESMapping mapping, SchemaItem schemaItem, ResultSet resultSet,
Map<String, Object> esFieldData) {
// 添加父子文档关联信息
if (!mapping.getRelations().isEmpty()) {
mapping.getRelations().forEach((relationField, relationMapping) -> {
Map<String, Object> relations = new HashMap<>();
relations.put("name", relationMapping.getName());
if (StringUtils.isNotEmpty(relationMapping.getParent())) {
FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
Object parentVal;
try {
parentVal = getValFromRS(mapping,
} catch (SQLException e) {
throw new RuntimeException(e);
if (parentVal != null) {
relations.put("parent", parentVal.toString());
esFieldData.put("$parent_routing", parentVal.toString());
esFieldData.put(relationField, relations);
private void putRelationData(ESMapping mapping, SchemaItem schemaItem, Map<String, Object> dmlData,
Map<String, Object> esFieldData) {
// 添加父子文档关联信息
if (!mapping.getRelations().isEmpty()) {
mapping.getRelations().forEach((relationField, relationMapping) -> {
Map<String, Object> relations = new HashMap<>();
relations.put("name", relationMapping.getName());
if (StringUtils.isNotEmpty(relationMapping.getParent())) {
FieldItem parentFieldItem = schemaItem.getSelectFields().get(relationMapping.getParent());
String columnName = parentFieldItem.getColumnItems().iterator().next().getColumnName();
Object parentVal = getValFromData(mapping, dmlData, parentFieldItem.getFieldName(), columnName);
if (parentVal != null) {
relations.put("parent", parentVal.toString());
esFieldData.put("$parent_routing", parentVal.toString());
esFieldData.put(relationField, relations);
* es 字段类型本地缓存
dataSourceKey: defaultDS
destination: example
groupId: g1
_index: customer
_type: _doc
_id: _id
name: order
parent: customer_id
sql: "select concat('oid_', t.id) as _id,
t.id as order_id,
t.serial_code as order_serial,
t.c_time as order_time
from biz_order t"
- customer_id
etlCondition: "where t.c_time>='{0}'"
commitBatch: 3000
dataSourceKey: defaultDS
destination: example
groupId: g1
_index: customer
_type: _doc
_id: id
name: customer
sql: "select t.id, t.name, t.email from customer t"
etlCondition: "where t.c_time>='{0}'"
commitBatch: 3000
# "mappings":{
# "_doc":{
# "properties":{
# "id": {
# "type": "long"
# },
# "name": {
# "type": "text"
# },
# "email": {
# "type": "text"
# },
# "order_id": {
# "type": "long"
# },
# "order_serial": {
# "type": "text"
# },
# "order_time": {
# "type": "date"
# },
# "customer_order":{
# "type":"join",
# "relations":{
# "customer":"order"
# }
# }
# }
# }
# }
......@@ -23,6 +23,7 @@ public class ConfigLoadTest {
public void testLoad() {
Map<String, ESSyncConfig> configMap = ESSyncConfigLoader.load(null);
ESSyncConfig config = configMap.get("mytest_user.yml");
Assert.assertEquals("defaultDS", config.getDataSourceKey());
ESSyncConfig.ESMapping esMapping = config.getEsMapping();
package com.alibaba.otter.canal.client.adapter.es.test;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ESTest {
private TransportClient transportClient;
public void init() throws UnknownHostException {
Settings.Builder settingBuilder = Settings.builder();
settingBuilder.put("cluster.name", TestConstant.clusterName);
Settings settings = settingBuilder.build();
transportClient = new PreBuiltTransportClient(settings);
String[] hostArray = TestConstant.esHosts.split(",");
for (String host : hostArray) {
int i = host.indexOf(":");
transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)),
Integer.parseInt(host.substring(i + 1))));
public void test01() {
SearchResponse response = transportClient.prepareSearch("test")
.setQuery(QueryBuilders.termQuery("_id", "1"))
for (SearchHit hit : response.getHits()) {
public void test02() {
Map<String, Object> esFieldData = new LinkedHashMap<>();
esFieldData.put("userId", 2L);
esFieldData.put("eventId", 4L);
esFieldData.put("eventName", "网络异常");
esFieldData.put("description", "第四个事件信息");
Map<String, Object> relations = new LinkedHashMap<>();
esFieldData.put("user_event", relations);
relations.put("name", "event");
relations.put("parent", "2");
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
.add(transportClient.prepareIndex("test", "osm", "2_4").setRouting("2").setSource(esFieldData));
public void test03() {
Map<String, Object> esFieldData = new LinkedHashMap<>();
esFieldData.put("userId", 2L);
esFieldData.put("eventName", "网络异常1");
Map<String, Object> relations = new LinkedHashMap<>();
esFieldData.put("user_event", relations);
relations.put("name", "event");
relations.put("parent", "2");
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
bulkRequestBuilder.add(transportClient.prepareUpdate("test", "osm", "2_4").setRouting("2").setDoc(esFieldData));
public void test04() {
BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
bulkRequestBuilder.add(transportClient.prepareDelete("test", "osm", "2_4"));
private void commit(BulkRequestBuilder bulkRequestBuilder) {
if (bulkRequestBuilder.numberOfActions() > 0) {
BulkResponse response = bulkRequestBuilder.execute().actionGet();
if (response.hasFailures()) {
for (BulkItemResponse itemResponse : response.getItems()) {
if (!itemResponse.isFailed()) {
if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
} else {
System.out.println("ES bulk commit error" + itemResponse.getFailureMessage());
public void after() {
......@@ -11,9 +11,9 @@ public class TestConstant {
public final static String jdbcPassword = "121212";
public final static String esHosts = "";
public final static String clusterNmae = "elasticsearch";
public final static String clusterName = "elasticsearch";
public static DruidDataSource dataSource;
public final static DruidDataSource dataSource;
static {
dataSource = new DruidDataSource();
......@@ -22,7 +22,7 @@ public class Common {
Map<String, String> properties = new HashMap<>();
properties.put("cluster.name", TestConstant.clusterNmae);
properties.put("cluster.name", TestConstant.clusterName);
ESAdapter esAdapter = new ESAdapter();
......@@ -96,7 +96,7 @@ public class HbaseAdapter implements OuterAdapter {
hbaseSyncService = new HbaseSyncService(hbaseTemplate);
configMonitor = new HbaseConfigMonitor();
configMonitor.init(this, envProperties);
} catch (Exception e) {
throw new RuntimeException(e);
......@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.hbase.monitor;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
......@@ -11,8 +12,8 @@ import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
import com.alibaba.otter.canal.client.adapter.hbase.HbaseAdapter;
import com.alibaba.otter.canal.client.adapter.hbase.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.support.MappingConfigsLoader;
......@@ -26,10 +27,13 @@ public class HbaseConfigMonitor {
private HbaseAdapter hbaseAdapter;
private Properties envProperties;
private FileAlterationMonitor fileMonitor;
public void init(HbaseAdapter hbaseAdapter) {
public void init(HbaseAdapter hbaseAdapter, Properties envProperties) {
this.hbaseAdapter = hbaseAdapter;
this.envProperties = envProperties;
File confDir = Util.getConfDirPath(adapterName);
try {
FileAlterationObserver observer = new FileAlterationObserver(confDir,
......@@ -60,7 +64,11 @@ public class HbaseConfigMonitor {
try {
// 加载新增的配置文件
String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
MappingConfig config = YmlConfigBinder
.bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
if (config == null) {
addConfigToCache(file, config);
......@@ -83,7 +91,11 @@ public class HbaseConfigMonitor {
MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
MappingConfig config = YmlConfigBinder
.bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
if (config == null) {
if (hbaseAdapter.getHbaseMapping().containsKey(file.getName())) {
......@@ -20,9 +20,9 @@ import com.alibaba.otter.canal.client.adapter.support.Dml;
public class HbaseSyncService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static Logger logger = LoggerFactory.getLogger(HbaseSyncService.class);
private HbaseTemplate hbaseTemplate; // HBase操作模板
private HbaseTemplate hbaseTemplate; // HBase操作模板
public HbaseSyncService(HbaseTemplate hbaseTemplate){
this.hbaseTemplate = hbaseTemplate;
......@@ -140,7 +140,7 @@ public class HbaseSyncService {
Integer.parseInt((String) entry.getValue()));
bytes = Bytes.toBytes(v);
} catch (Exception e) {
// ignore
logger.error(e.getMessage(), e);
......@@ -7,12 +7,10 @@ import java.math.RoundingMode;
import java.sql.Timestamp;
import java.util.Date;
import com.alibaba.otter.canal.client.adapter.support.Util;
import org.apache.hadoop.hbase.util.Bytes;
import org.joda.time.DateTime;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.google.common.math.LongMath;
import org.joda.time.DateTimeZone;
* Phoenix类型转换工具类
......@@ -27,7 +27,7 @@ public class EtlLock {
private static final Map<String, InterProcessMutex> DISTRIBUTED_LOCK = new ConcurrentHashMap<>();
private static Mode mode = Mode.LOCAL;
private Mode mode = Mode.LOCAL;
private CuratorClient curatorClient;
......@@ -34,7 +34,7 @@ public class SyncSwitch {
private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK = new ConcurrentHashMap<>();
private static Mode mode = Mode.LOCAL;
private Mode mode = Mode.LOCAL;
private AdapterCanalConfig adapterCanalConfig;
......@@ -165,20 +165,20 @@ public class SyncSwitch {
public Boolean status(String destination) {
public boolean status(String destination) {
if (mode == Mode.LOCAL) {
BooleanMutex mutex = LOCAL_LOCK.get(destination);
if (mutex != null) {
return mutex.state();
} else {
return null;
return false;
} else {
BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
if (mutex != null) {
return mutex.state();
} else {
return null;
return false;
......@@ -3,7 +3,6 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
......@@ -17,6 +16,7 @@ import com.alibaba.otter.canal.client.adapter.OuterAdapter;
import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
import com.alibaba.otter.canal.client.adapter.support.Dml;
import com.alibaba.otter.canal.client.adapter.support.MessageUtil;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
......@@ -43,7 +43,7 @@ public abstract class AbstractCanalAdapterWorker {
public AbstractCanalAdapterWorker(List<List<OuterAdapter>> canalOuterAdapters){
this.canalOuterAdapters = canalOuterAdapters;
this.groupInnerExecutorService = Executors.newFixedThreadPool(canalOuterAdapters.size());
this.groupInnerExecutorService = Util.newFixedThreadPool(canalOuterAdapters.size(), 5000L);
syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class);
......@@ -2,13 +2,13 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.errors.WakeupException;
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
......@@ -44,12 +44,12 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
protected void process() {
while (!running) {
try {
} catch (InterruptedException e) {
// ignore
ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
ExecutorService workerExecutor = Util.newSingleThreadExecutor(5000L);
int retry = canalClientConfig.getRetries() == null
|| canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
......@@ -63,8 +63,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
while (running) {
Boolean status = syncSwitch.status(canalDestination);
if (status != null && !status) {
boolean status = syncSwitch.status(canalDestination);
if (!status) {
......@@ -85,6 +85,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
try {
} catch (WakeupException e) {
......@@ -5,7 +5,7 @@ import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
......@@ -176,40 +176,32 @@ public class CanalAdapterLoader {
public void destroy() {
if (!canalWorkers.isEmpty()) {
ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
List<Future<Boolean>> futures = new ArrayList<>();
for (CanalAdapterWorker canalAdapterWorker : canalWorkers.values()) {
futures.add(stopExecutorService.submit(() -> {
return true;
futures.forEach(future -> {
try {
} catch (Exception e) {
try {
while (!stopExecutorService.awaitTermination(1, TimeUnit.SECONDS)) {
// ignore
} catch (InterruptedException e) {
// ignore
if (!canalMQWorker.isEmpty()) {
ExecutorService stopMQWorkerService = Executors.newFixedThreadPool(canalMQWorker.size());
List<Future<Boolean>> futures = new ArrayList<>();
for (AbstractCanalAdapterWorker canalAdapterMQWorker : canalMQWorker.values()) {
futures.add(stopMQWorkerService.submit(() -> {
return true;
futures.forEach(future -> {
try {
} catch (Exception e) {
try {
while (!stopMQWorkerService.awaitTermination(1, TimeUnit.SECONDS)) {
// ignore
} catch (InterruptedException e) {
// ignore
logger.info("All canal adapters destroyed");
......@@ -2,12 +2,12 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.common.errors.WakeupException;
import com.alibaba.otter.canal.client.adapter.OuterAdapter;
import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
import com.alibaba.otter.canal.client.adapter.support.Util;
import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
......@@ -49,8 +49,9 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
ExecutorService workerExecutor = Util.newSingleThreadExecutor(5000L);
int retry = canalClientConfig.getRetries() == null
|| canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
while (running) {
......@@ -62,8 +63,8 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
while (running) {
Boolean status = syncSwitch.status(canalDestination);
if (status != null && !status) {
boolean status = syncSwitch.status(canalDestination);
if (!status) {
......@@ -84,6 +85,8 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
try {
} catch (WakeupException e) {
......@@ -140,7 +140,7 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
} catch (Exception e) {
} catch (Throwable e) {
logger.error("process error!", e);
} finally {
package com.alibaba.otter.canal.adapter.launcher.monitor;
import java.io.File;
import java.io.FileReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import javax.annotation.PostConstruct;
......@@ -68,7 +70,7 @@ public class ApplicationConfigMonitor {
try {
// 检查yml格式
new Yaml().loadAs(new FileReader(file), Map.class);
new Yaml().loadAs(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8), Map.class);
package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
import java.io.FileWriter;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
......@@ -35,7 +37,7 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
private DruidDataSource dataSource;
private static volatile long currentConfigTimestamp = 0;
private volatile long currentConfigTimestamp = 0;
private Map<String, ConfigItem> remoteAdapterConfigs = new MapMaker().makeMap();
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(2,
......@@ -115,7 +117,10 @@ public class DbRemoteConfigLoader implements RemoteConfigLoader {
* @param content 文件内容
private void overrideLocalCanalConfig(String content) {
try (FileWriter writer = new FileWriter(CommonUtils.getConfPath() + "application.yml")) {
try (OutputStreamWriter writer = new OutputStreamWriter(
new FileOutputStream(CommonUtils.getConfPath() + "application.yml"),
StandardCharsets.UTF_8)) {
} catch (Exception e) {
package com.alibaba.otter.canal.adapter.launcher.monitor.remote;
import com.alibaba.otter.canal.common.utils.CommonUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import com.alibaba.otter.canal.common.utils.CommonUtils;
* 远程配置监听器实现
......@@ -35,7 +38,9 @@ public class RemoteAdapterMonitorImpl implements RemoteAdapterMonitor {
String name = configItem.getName();
try (FileWriter writer = new FileWriter(confPath + category + "/" + configItem.getName())) {
try (OutputStreamWriter writer = new OutputStreamWriter(
new FileOutputStream(confPath + category + "/" + configItem.getName()),
StandardCharsets.UTF_8)) {
logger.info("## Loaded remote adapter config: {}/{}", category, name);
......@@ -69,16 +69,16 @@ public class CommonRest {
try {
Boolean oriSwitchStatus;
boolean oriSwitchStatus;
if (destination != null) {
oriSwitchStatus = syncSwitch.status(destination);
if (oriSwitchStatus != null && oriSwitchStatus) {
if (oriSwitchStatus) {
} else {
// task可能为destination,直接锁task
oriSwitchStatus = syncSwitch.status(task);
if (oriSwitchStatus != null && oriSwitchStatus) {
if (oriSwitchStatus) {
......@@ -89,9 +89,9 @@ public class CommonRest {
return adapter.etl(task, paramArray);
} finally {
if (destination != null && oriSwitchStatus != null && oriSwitchStatus) {
if (destination != null && oriSwitchStatus) {
} else if (destination == null && oriSwitchStatus != null && oriSwitchStatus) {
} else if (destination == null && oriSwitchStatus) {
......@@ -102,7 +102,7 @@ public class CommonRest {
* ETL curl -X POST
* @param type 类型 hbase, es
* @param task 任务名对应配置文件名 mytest_person2.yml
* @param params etl where条件参数, 为空全部导入
......@@ -129,7 +129,7 @@ public class CommonRest {
* 统计总数 curl
* @param type 类型 hbase, es
* @param task 任务名对应配置文件名 mytest_person2.yml
* @return
......@@ -148,11 +148,11 @@ public class CommonRest {
Set<String> destinations = adapterCanalConfig.DESTINATIONS;
for (String destination : destinations) {
Map<String, String> resMap = new LinkedHashMap<>();
Boolean status = syncSwitch.status(destination);
String resStatus = "none";
if (status != null && status) {
boolean status = syncSwitch.status(destination);
String resStatus;
if (status) {
resStatus = "on";
} else if (status != null && !status) {
} else {
resStatus = "off";
resMap.put("destination", destination);
......@@ -164,7 +164,7 @@ public class CommonRest {
* 实例同步开关 curl -X PUT
* @param destination 实例名称
* @param status 开关状态: off on
* @return
......@@ -189,17 +189,17 @@ public class CommonRest {
* 获取实例开关状态 curl
* @param destination 实例名称
* @return
public Map<String, String> etl(@PathVariable String destination) {
Boolean status = syncSwitch.status(destination);
String resStatus = "none";
if (status != null && status) {
boolean status = syncSwitch.status(destination);
String resStatus;
if (status) {
resStatus = "on";
} else if (status != null && !status) {
} else {
resStatus = "off";
Map<String, String> res = new LinkedHashMap<>();
......@@ -146,7 +146,7 @@ public class RdbAdapter implements OuterAdapter {
rdbConfigMonitor = new RdbConfigMonitor();
rdbConfigMonitor.init(configuration.getKey(), this);
rdbConfigMonitor.init(configuration.getKey(), this, envProperties);
package com.alibaba.otter.canal.client.adapter.rdb.config;
import org.apache.commons.lang.StringUtils;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
* RDB表映射配置
......@@ -21,7 +21,7 @@ public class MappingConfig {
private String outerAdapterKey; // 对应适配器的key
private Boolean concurrent; // 是否并行同步
private boolean concurrent = false; // 是否并行同步
private DbMapping dbMapping; // db映射配置
......@@ -49,11 +49,11 @@ public class MappingConfig {
this.outerAdapterKey = outerAdapterKey;
public Boolean getConcurrent() {
return concurrent == null ? false : concurrent;
public boolean getConcurrent() {
return concurrent;
public void setConcurrent(Boolean concurrent) {
public void setConcurrent(boolean concurrent) {
this.concurrent = concurrent;
......@@ -87,11 +87,11 @@ public class MappingConfig {
public static class DbMapping {
private Boolean mirrorDb = false; // 是否镜像库
private boolean mirrorDb = false; // 是否镜像库
private String database; // 数据库名或schema名
private String table; // 表名
private Map<String, String> targetPk = new LinkedHashMap<>(); // 目标表主键字段
private Boolean mapAll = false; // 映射所有字段
private boolean mapAll = false; // 映射所有字段
private String targetDb; // 目标库名
private String targetTable; // 目标表名
private Map<String, String> targetColumns; // 目标表字段映射
......@@ -103,11 +103,11 @@ public class MappingConfig {
private Map<String, String> allMapColumns;
public Boolean getMirrorDb() {
return mirrorDb == null ? false : mirrorDb;
public boolean getMirrorDb() {
return mirrorDb;
public void setMirrorDb(Boolean mirrorDb) {
public void setMirrorDb(boolean mirrorDb) {
this.mirrorDb = mirrorDb;
......@@ -3,6 +3,7 @@ package com.alibaba.otter.canal.client.adapter.rdb.monitor;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
......@@ -11,8 +12,8 @@ import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import com.alibaba.otter.canal.client.adapter.config.YmlConfigBinder;
import com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter;
import com.alibaba.otter.canal.client.adapter.rdb.config.MappingConfig;
import com.alibaba.otter.canal.client.adapter.rdb.config.MirrorDbConfig;
......@@ -29,11 +30,14 @@ public class RdbConfigMonitor {
private RdbAdapter rdbAdapter;
private Properties envProperties;
private FileAlterationMonitor fileMonitor;
public void init(String key, RdbAdapter rdbAdapter) {
public void init(String key, RdbAdapter rdbAdapter, Properties envProperties) {
this.key = key;
this.rdbAdapter = rdbAdapter;
this.envProperties = envProperties;
File confDir = Util.getConfDirPath(adapterName);
try {
FileAlterationObserver observer = new FileAlterationObserver(confDir,
......@@ -64,7 +68,11 @@ public class RdbConfigMonitor {
try {
// 加载新增的配置文件
String configContent = MappingConfigsLoader.loadConfig(adapterName + File.separator + file.getName());
MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
MappingConfig config = YmlConfigBinder
.bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
if (config == null) {
if ((key == null && config.getOuterAdapterKey() == null)
|| (key != null && key.equals(config.getOuterAdapterKey()))) {
......@@ -90,7 +98,11 @@ public class RdbConfigMonitor {
MappingConfig config = new Yaml().loadAs(configContent, MappingConfig.class);
MappingConfig config = YmlConfigBinder
.bindYmlToObj(null, configContent, MappingConfig.class, null, envProperties);
if (config == null) {
if ((key == null && config.getOuterAdapterKey() == null)
|| (key != null && key.equals(config.getOuterAdapterKey()))) {
......@@ -6,8 +6,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
......@@ -78,8 +77,7 @@ public class RdbEtlService {
if (cnt >= 10000) {
int threadCount = 3;
long perThreadCnt = cnt / threadCount;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<Future<Boolean>> futures = new ArrayList<>(threadCount);
ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
for (int i = 0; i < threadCount; i++) {
long offset = i * perThreadCnt;
Long size = null;
......@@ -92,16 +90,14 @@ public class RdbEtlService {
} else {
sqlFinal = sql + " LIMIT " + offset + "," + cnt;
Future<Boolean> future = executor
.submit(() -> executeSqlImport(srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
for (Future<Boolean> future : futures) {
.execute(() -> executeSqlImport(srcDS, targetDS, sqlFinal, dbMapping, successCount, errMsg));
while (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
// ignore
} else {
executeSqlImport(srcDS, targetDS, sql.toString(), dbMapping, successCount, errMsg);
......@@ -92,17 +92,16 @@ public class DBTest {
private String clob2Str(Clob clob) {
String content = "";
try {
Reader is = clob.getCharacterStream();
BufferedReader buff = new BufferedReader(is);
try (Reader is = clob.getCharacterStream(); BufferedReader buff = new BufferedReader(is)) {
String line = buff.readLine();
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
while (line != null) {
line = buff.readLine();
content = sb.toString();
} catch (Exception e) {
return content;
......@@ -10,7 +10,7 @@ public class TestConstant {
public final static String jdbcUser = "root";
public final static String jdbcPassword = "121212";
public static DruidDataSource dataSource;
public final static DruidDataSource dataSource;
static {
dataSource = new DruidDataSource();
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册