提交 fd2a3231 编写于 作者: FelixHPP's avatar FelixHPP

build: 1.0.9; 优化ES6.x 和7.x中type问题, 优化routing设置问题

上级 ec4a8f9c
......@@ -5,7 +5,7 @@
<parent>
<artifactId>easy-es</artifactId>
<groupId>indi.felix.easy</groupId>
<version>1.0.6</version>
<version>1.0.8</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>easy-es-core</artifactId>
......@@ -65,7 +65,6 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<!-- <version>4.12</version>-->
<scope>test</scope>
</dependency>
<dependency>
......@@ -75,13 +74,11 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<!-- <version>2.10.0</version>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<!-- <version>2.10.0</version>-->
</dependency>
<dependency>
<groupId>commons-logging</groupId>
......
......@@ -182,6 +182,19 @@ public class EasyEs {
// return responseBody(defaultMethod, newEndpoint);
// }
public void bulkAddDoc(String index, List<Map> objects, String idKey, String routingKey, String parentKey) {
BulkRequest request = new BulkRequest();
if(!rest().indexExists(index)){
throw new ElasticsearchException("index is not exists");
}
if (objects == null || objects.size() < 1) {
throw new ElasticsearchException("mappers can not be empty");
}
search().bulkInsert(index, objects, 2000);
}
public void bulkAddDoc(String index, List<Map> objects) {
BulkRequest request = new BulkRequest();
......
......@@ -12,7 +12,6 @@ import indi.felix.easy.core.elastic.utool.DelEsListener;
import indi.felix.easy.core.elastic.utool.EsUtils;
import indi.felix.easy.core.exceptions.ElasticsearchException;
import indi.felix.easy.core.metadata.RuntimeElasticMetaData;
import indi.felix.easy.core.utils.FastJsonUtils;
import javafx.util.Pair;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -96,7 +95,7 @@ public abstract class AbstractService {
* @param list
* @return
*/
protected ArrayList<Pair<String, Object>> insertBatch(String index, List<Map> list, int batchSize) {
protected ArrayList<Pair<String, Object>> insertBatch(String index, List<Map> list, int batchSize, String idKey, String routingKey, String parentKey) {
ArrayList<Pair<String, Object>> failure = new ArrayList<>();
if (batchSize <= 0 || batchSize > 2000) {
batchSize = 2000;
......@@ -111,18 +110,20 @@ public abstract class AbstractService {
String json = JSON.toJSONString(obj);
if (StringUtils.isNotEmpty(json)) {
if (cursor < batchSize) {
IndexRequest indexRequest = new IndexRequest(index, DOC_TYPE);
// IndexRequest indexRequest = new IndexRequest(index, DOC_TYPE);
IndexRequest indexRequest = new IndexRequest(index);
// 设置ID, routing , parent 等字段
EsUtils.setDefaultProperties(indexRequest, json);
EsUtils.setDefaultProperties(indexRequest, json, idKey, routingKey, parentKey);
request.add(indexRequest.create(false).source(json, XContentType.JSON));
} else {
everyBatchWrapper(restHighLevelClient, request, failure);
cursor = 0;
request = new BulkRequest();
IndexRequest indexRequest = new IndexRequest(index, DOC_TYPE);
// IndexRequest indexRequest = new IndexRequest(index, DOC_TYPE);
IndexRequest indexRequest = new IndexRequest(index);
// 设置ID, routing , parent 等字段
EsUtils.setDefaultProperties(indexRequest, json);
EsUtils.setDefaultProperties(indexRequest, json, idKey, routingKey, parentKey);
request.add(indexRequest.create(false).source(json, XContentType.JSON));
}
cursor++;
......
......@@ -107,7 +107,11 @@ public class EasyEsService extends AbstractService {
* @return ArrayList
*/
public ArrayList<Pair<String, Object>> bulkInsert(String index, List<Map> list, int batchSize) {
return insertBatch(index, list, batchSize);
return insertBatch(index, list, batchSize, null, null, null);
}
public ArrayList<Pair<String, Object>> bulkInsert(String index, List<Map> list, int batchSize, String idKey, String routingKey, String parentKey) {
return insertBatch(index, list, batchSize, idKey, routingKey, parentKey);
}
......
......@@ -16,7 +16,8 @@ import java.util.Map;
@Data
public class SettingModel {
/**
* 分片数, 默认5, static
* 分片数, 默认1, static
* 默认分片数将从5变为1在7.x版本,如想使用默认的5可以新建带有索引模板的索引请求
*/
private int shards;
......@@ -40,7 +41,7 @@ public class SettingModel {
private int totalFields;
public SettingModel() {
this.shards = 5;
this.shards = 1;
this.replicas = 1;
this.totalFields = 5000;
// this.refreshInterval = 1s
......
......@@ -83,9 +83,6 @@ public class DocRestApi extends BaseRest {
return addDoc(index, params, map);
}
// public String updateByQuery() {
//
// }
private String addDoc(String index, Map<String, String> params, Map map) {
Args.notNull(index, "index");
......@@ -104,20 +101,22 @@ public class DocRestApi extends BaseRest {
return true;
}
String newEndpoint = null;
if (id.length == 1) {
newEndpoint = MessageFormat.format("/{0}/{1}", index, id[0]);
Response response = res(DELETE, newEndpoint, Collections.EMPTY_MAP, null);
int code = response.getStatusLine().getStatusCode();
IdsQueryBuilder idsQueryBuilder = new IdsQueryBuilder();
idsQueryBuilder.addIds(id);
return deleteDocByQuery(index, idsQueryBuilder);
// if (id.length == 1) {
// newEndpoint = MessageFormat.format("/{0}/{1}", index, id[0]);
// Response response = res(DELETE, newEndpoint, Collections.EMPTY_MAP, null);
// int code = response.getStatusLine().getStatusCode();
//
// return code == 200;
return code == 200;
} else {
// } else {
// todo 待完善
// newEndpoint = MessageFormat.format("/{0}/{1}/{2}", index, DOC_TYPE, "_delete_by_query");
IdsQueryBuilder idsQueryBuilder = new IdsQueryBuilder();
idsQueryBuilder.addIds(id);
return deleteDocByQuery(index, idsQueryBuilder);
}
// }
}
public boolean deleteDocByQuery(String index, QueryBuilder queryBuilder) {
......
......@@ -151,30 +151,33 @@ public class EasyRest extends BaseRest {
try {
Map<String, Map> map = new HashMap<>();
if (setting != null && setting.size() > 0) {
map.put("settings", setting);
} else {
//默认分片数将从5变为1在7.x版本,如想使用默认的5可以新建带有索引模板的索引请求
SettingModel settingModel = new SettingModel();
setting = settingModel.getSettingMap();
map.put("settings", setting);
}
// 创建默认mapping
Map<String, Map> mappingMap = new HashMap<>();
// 考虑需要使用join查询, 索引mapping中_routing的required必须设置为true。
if (mapping != null && mapping.size() > 0) {
if (mapping.containsKey("properties")) {
// 设置type
Map<String, Map> mappingMap = new HashMap<>();
mappingMap.put(DOC_TYPE, mapping);
map.put("mappings", mappingMap);
} else {
map.put("mappings", mapping);
mappingMap = mapping;
}
} else {
// 创建默认mapping
Map<String, Map> mappingMap = new HashMap<>();
mappingMap.put(DOC_TYPE, Collections.EMPTY_MAP);
map.put("mappings", mappingMap);
}
map.put("mappings", mappingMap);
String jsonString = JSON.toJSONString(map);
String newEndpoint = index + "?include_type_name=true";
Request request = new Request(
"PUT",
index);
newEndpoint);
request.setEntity(new NStringEntity(
jsonString,
ContentType.APPLICATION_JSON));
......
......@@ -38,7 +38,8 @@ public class IndexRestApi extends BaseRest {
* @return
*/
public boolean exists(String index) {
Response response = res(HEAD, index, Collections.EMPTY_MAP, null);
String newEndpoint = index + "?include_type_name=true";
Response response = res(HEAD, newEndpoint, Collections.EMPTY_MAP, null);
int code = response.getStatusLine().getStatusCode();
return code == 200;
}
......
......@@ -48,15 +48,17 @@ public class SearchRestApi extends BaseRest {
/**
* 查询指定ID的文档
*
* 如果索引设置了routing, 会报routing_missing_exception 异常
* @param index 多个索引查询用逗号分割
* @param id
* @return
*/
public String requestById(String index, String id) {
String newEndpoint = MessageFormat.format("/{0}/{1}", index, id);
return responseBody(defaultMethod, newEndpoint);
String newEndpoint = MessageFormat.format("/{0}/{1}/{2}", index, DOC_TYPE, id);
//?routing=key1
// 采用index/_doc/id 这种api 可能会有routing的问题,
// return responseBody(GET, newEndpoint);
return requestByIds(index, id);
}
/**
......
......@@ -8,12 +8,15 @@ public class Const {
*/
public static final String DOC_TYPE = "_doc";
public static final String UID = "_id";
/**
* 默认的ID字段, 对象中如果有该字段, 默认存储ES将按该字段设置ID
*/
public static final String ID = "id";
public static final String UID = "_id";
/**
* 默认的routing字段, 同ID
*/
......
......@@ -9,37 +9,63 @@ import java.util.Map;
public class EsUtils {
public static IndexRequest setDefaultProperties(IndexRequest indexRequest, Object object) {
public static IndexRequest setDefaultProperties(IndexRequest indexRequest, Object object, String idKey, String routingKey, String parentKey) {
JSONObject jsonObject = null;
if (object instanceof JSONObject) {
jsonObject = (JSONObject) object;
} else if(object instanceof String){
} else if (object instanceof String) {
jsonObject = JSON.parseObject(object.toString());
} else {
jsonObject = (JSONObject) JSONObject.toJSON(object);
}
if (jsonObject.containsKey(Const.ID)) {
String id = jsonObject.getString(Const.ID);
if (StringUtils.isNotBlank(idKey)) {
String id = jsonObject.getString(idKey);
if (StringUtils.isNotBlank(id)) {
indexRequest.id(id);
}
}
if (jsonObject.containsKey(Const.ROUTING)) {
String routing = jsonObject.getString(Const.ROUTING);
if (StringUtils.isNotBlank(routingKey)) {
String routing = jsonObject.getString(routingKey);
if (StringUtils.isNotBlank(routing)) {
indexRequest.routing(routing);
}
}
if (jsonObject.containsKey(Const.PARENT)) {
String parent = jsonObject.getString(Const.PARENT);
if (StringUtils.isNotBlank(parentKey)) {
String parent = jsonObject.getString(parentKey);
if (StringUtils.isNotBlank(parent)) {
indexRequest.parent(parent);
}
}
// if (jsonObject.containsKey(Const.ID)) {
// String id = jsonObject.getString(Const.ID);
// if (StringUtils.isNotBlank(id)) {
// indexRequest.id(id);
// }
// } else if (jsonObject.containsKey("ID")) {
// String id = jsonObject.getString("ID");
// if (StringUtils.isNotBlank(id)) {
// indexRequest.id(id);
// }
// }
//
// if (jsonObject.containsKey(Const.ROUTING)) {
// String routing = jsonObject.getString(Const.ROUTING);
// if (StringUtils.isNotBlank(routing)) {
// indexRequest.routing(routing);
// }
// }
//
// if (jsonObject.containsKey(Const.PARENT)) {
// String parent = jsonObject.getString(Const.PARENT);
// if (StringUtils.isNotBlank(parent)) {
// indexRequest.parent(parent);
// }
// }
return indexRequest;
}
}
package indi.felix.easy.core.elastic.utool;
import org.elasticsearch.common.settings.Settings;
import java.sql.Connection;
import java.sql.DriverManager;
public class Main {
public static void main(String[] args) {
// Connection conn = DriverManager.getConnection(url, user, password);
// // 监控线程
// Monitor monitorService = new Monitor();
// monitorService.monitorToES();
// // 已办生产者线程
// Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer"));
// pendProducerThread.start();
// // 已阅生产者线程
// Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer"));
// readProducerThread.start();
//
// String cName = meta.get("cName");//es集群名字
// String esNodes = meta.get("esNodes");//es集群ip节点
// Settings esSetting = Settings.builder()
// .put("cluster.name", cName)
// .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群
// .put("thread_pool.search.size", 5)//增加线程池个数,暂时设为5
// .build();
// String[] nodes = esNodes.split(",");
// Object client =new PreBuiltTransportClient(esSetting);
// for(String node :nodes){
// if (node.length() > 0) {
// String[] hostPort = node.split(":");
// client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
// }
// }
}
}
package indi.felix.easy.core.elastic.utool;
public class Monitor {
public void monitorToES() {
// new Thread(() -> {
// while (true) {
// StringBuilder sb = new StringBuilder();
// sb.append("已办表数::").append(Const.TBL.TBL_PEND_COUNT)
// .append("::已办总数::").append(Const.COUNTER.LD_P_TOTAL)
// .append("::已办入库总数::").append(Const.COUNTER.LD_P);
// sb.append("~~~~已阅表数::").append(Const.TBL.TBL_READ_COUNT);
// sb.append("::已阅总数::").append(Const.COUNTER.LD_R_TOTAL)
// .append("::已阅入库总数::").append(Const.COUNTER.LD_R);
// if (ldPrevPendCount == 0 && ldPrevReadCount == 0) {
// ldPrevPendCount = Const.COUNTER.LD_P.get();
// ldPrevReadCount = Const.COUNTER.LD_R.get();
// start = System.currentTimeMillis();
// } else {
// long end = System.currentTimeMillis();
// if ((end - start) / 1000 >= 60) {
// start = end;
// sb.append("\n#########################################\n");
// sb.append("已办每分钟TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "条");
// sb.append("::已阅每分钟TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "条");
// ldPrevPendCount = Const.COUNTER.LD_P.get();
// ldPrevReadCount = Const.COUNTER.LD_R.get();
// }
// }
// System.out.println(sb.toString());
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// }).start();
}
}
package indi.felix.easy.core.elastic.utool;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ZlPendProducer implements Runnable {
// private final static String threadName = "ES-Import-";
private static int THREADS = 21;
// public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);
// public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);
public ZlPendProducer(Connection connectionnn, String threadName){
}
@Override
public void run() {
// System.out.println(threadName + "::启动...");
// for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) {
// try {
// int size = 1000;
// for (int i = 0; i < count; i += size) {
// if (i + size > count) {
// //作用为size最后没有100条数据则剩余几条newList中就装几条
// size = count - i;
// }
// String sql = "select * from " + tableName + " limit " + i + ", " + size;
// System.out.println(tableName + "::sql::" + sql);
// rs = statement.executeQuery(sql);
// List<HistPendingEntity> lst = new ArrayList<>();
// while (rs.next()) {
// HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
// lst.add(p);
// }
// MteExecutor.POR.submit(new ZlPendConsumer(lst));
// Thread.sleep(2000);
// }
//
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}
}
package indi.felix.easy.core.elastic.utool;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ZlReadProducer implements Runnable {
private static int THREADS = 21;
// public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);
// public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);
public ZlReadProducer(Connection connectionnn, String threadName){
}
@Override
public void run() {
// System.out.println(threadName + "::启动...");
// for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) {
// try {
// int size = 1000;
// for (int i = 0; i < count; i += size) {
// if (i + size > count) {
// //作用为size最后没有100条数据则剩余几条newList中就装几条
// size = count - i;
// }
// String sql = "select * from " + tableName + " limit " + i + ", " + size;
// System.out.println(tableName + "::sql::" + sql);
// rs = statement.executeQuery(sql);
// List<HistPendingEntity> lst = new ArrayList<>();
// while (rs.next()) {
// HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
// lst.add(p);
// }
// MteExecutor.POR.submit(new ZlPendConsumer(lst));
// Thread.sleep(2000);
// }
//
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}
}
......@@ -43,7 +43,7 @@ public class EasyRestTest extends TestCase {
easyEs.rest().document().addDoc(testIndexName, "123457", "123457", map2);
Assert.assertEquals("{\"_index\":\"test_20210907\",\"_type\":\"doc\",\"_id\":\"123456\",\"_version\":1,\"found\":true,\"_source\":{\"name\":\"李四\",\"age\":\"11\"}}",
Assert.assertEquals("{\"docs\":[{\"_index\":\"test_20210907\",\"_type\":\"_doc\",\"_id\":\"123456\",\"_version\":1,\"_seq_no\":1,\"_primary_term\":1,\"found\":true,\"_source\":{\"name\":\"李四\",\"age\":\"11\"}}]}",
easyEs.rest().search().requestById(testIndexName, "123456"));
......
......@@ -13,6 +13,9 @@ public class SearchRestApiTest extends TestCase {
String res = easyEs.rest().search().requestByIds(testIndexName, ids);
System.out.println(res);
String re1s = easyEs.rest().search().requestById(testIndexName, "pa_18978559");
System.out.println(re1s);
}
......
......@@ -7,7 +7,7 @@
<groupId>indi.felix.easy</groupId>
<artifactId>easy-es</artifactId>
<packaging>pom</packaging>
<version>1.0.6</version>
<version>1.0.8</version>
<modules>
<module>easy-es-core</module>
<!-- <module>easy-es-spring</module>-->
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册