提交 e6a7720f 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Provide create, delete, isExists template method by elastic client. (#2425)

上级 601b472e
...@@ -62,7 +62,7 @@ ...@@ -62,7 +62,7 @@
<shardingjdbc.version>2.0.3</shardingjdbc.version> <shardingjdbc.version>2.0.3</shardingjdbc.version>
<commons-dbcp.version>1.4</commons-dbcp.version> <commons-dbcp.version>1.4</commons-dbcp.version>
<commons-io.version>2.6</commons-io.version> <commons-io.version>2.6</commons-io.version>
<elasticsearch.version>6.3.2</elasticsearch.version> <elasticsearch.version>6.7.0</elasticsearch.version>
<joda-time.version>2.9.9</joda-time.version> <joda-time.version>2.9.9</joda-time.version>
<kubernetes.version>2.0.0</kubernetes.version> <kubernetes.version>2.0.0</kubernetes.version>
<hikaricp.version>3.1.0</hikaricp.version> <hikaricp.version>3.1.0</hikaricp.version>
......
...@@ -18,53 +18,35 @@ ...@@ -18,53 +18,35 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch; package org.apache.skywalking.oap.server.library.client.elasticsearch;
import java.io.IOException;
import java.util.*;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.*;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.Client;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.get.*;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Response; import org.elasticsearch.client.*;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.indices.*;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.*;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.*;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger; import org.slf4j.*;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -94,13 +76,7 @@ public class ElasticSearchClient implements Client { ...@@ -94,13 +76,7 @@ public class ElasticSearchClient implements Client {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
builder = RestClient.builder(pairsList.toArray(new HttpHost[0])) builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
} else { } else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0])); builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
} }
...@@ -133,8 +109,8 @@ public class ElasticSearchClient implements Client { ...@@ -133,8 +109,8 @@ public class ElasticSearchClient implements Client {
indexName = formatIndexName(indexName); indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName); CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings); request.settings(settings);
request.mapping(TYPE, mappingBuilder); request.mapping(mappingBuilder);
CreateIndexResponse response = client.indices().create(request); CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged(); return response.isAcknowledged();
} }
...@@ -142,17 +118,44 @@ public class ElasticSearchClient implements Client { ...@@ -142,17 +118,44 @@ public class ElasticSearchClient implements Client {
public boolean deleteIndex(String indexName) throws IOException { public boolean deleteIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName); indexName = formatIndexName(indexName);
DeleteIndexRequest request = new DeleteIndexRequest(indexName); DeleteIndexRequest request = new DeleteIndexRequest(indexName);
DeleteIndexResponse response; AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
response = client.indices().delete(request);
logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged(); return response.isAcknowledged();
} }
public boolean isExistsIndex(String indexName) throws IOException { public boolean isExistsIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName); indexName = formatIndexName(indexName);
GetIndexRequest request = new GetIndexRequest(); GetIndexRequest request = new GetIndexRequest(indexName);
request.indices(indexName); return client.indices().exists(request, RequestOptions.DEFAULT);
return client.indices().exists(request); }
public boolean isExistsTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
IndexTemplatesExistRequest request = new IndexTemplatesExistRequest(indexName);
return client.indices().existsTemplate(request, RequestOptions.DEFAULT);
}
public boolean createTemplate(String indexName, Settings settings,
XContentBuilder mappingBuilder) throws IOException {
indexName = formatIndexName(indexName);
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest request = new PutIndexTemplateRequest(indexName);
request.patterns(Collections.singletonList(indexName + "*"));
request.settings(settings);
request.mapping("_doc", mappingBuilder);
AcknowledgedResponse response = client.indices().putTemplate(request, RequestOptions.DEFAULT);
logger.debug("create {} template finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean deleteTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest();
request.name(indexName);
AcknowledgedResponse deleteTemplateAcknowledge = client.indices().deleteTemplate(request, RequestOptions.DEFAULT);
return deleteTemplateAcknowledge.isAcknowledged();
} }
public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException { public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
...@@ -160,39 +163,39 @@ public class ElasticSearchClient implements Client { ...@@ -160,39 +163,39 @@ public class ElasticSearchClient implements Client {
SearchRequest searchRequest = new SearchRequest(indexName); SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE); searchRequest.types(TYPE);
searchRequest.source(searchSourceBuilder); searchRequest.source(searchSourceBuilder);
return client.search(searchRequest); return client.search(searchRequest, RequestOptions.DEFAULT);
} }
public GetResponse get(String indexName, String id) throws IOException { public GetResponse get(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName); indexName = formatIndexName(indexName);
GetRequest request = new GetRequest(indexName, TYPE, id); GetRequest request = new GetRequest(indexName, TYPE, id);
return client.get(request); return client.get(request, RequestOptions.DEFAULT);
} }
public MultiGetResponse multiGet(String indexName, List<String> ids) throws IOException { public MultiGetResponse multiGet(String indexName, List<String> ids) throws IOException {
final String newIndexName = formatIndexName(indexName); final String newIndexName = formatIndexName(indexName);
MultiGetRequest request = new MultiGetRequest(); MultiGetRequest request = new MultiGetRequest();
ids.forEach(id -> request.add(newIndexName, TYPE, id)); ids.forEach(id -> request.add(newIndexName, TYPE, id));
return client.multiGet(request); return client.mget(request, RequestOptions.DEFAULT);
} }
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException { public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
IndexRequest request = prepareInsert(indexName, id, source); IndexRequest request = prepareInsert(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request); client.index(request, RequestOptions.DEFAULT);
} }
public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException { public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
UpdateRequest request = prepareUpdate(indexName, id, source); UpdateRequest request = prepareUpdate(indexName, id, source);
request.version(version); request.version(version);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request); client.update(request, RequestOptions.DEFAULT);
} }
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException { public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
UpdateRequest request = prepareUpdate(indexName, id, source); UpdateRequest request = prepareUpdate(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request); client.update(request, RequestOptions.DEFAULT);
} }
public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) { public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
...@@ -207,20 +210,14 @@ public class ElasticSearchClient implements Client { ...@@ -207,20 +210,14 @@ public class ElasticSearchClient implements Client {
public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException { public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
indexName = formatIndexName(indexName); indexName = formatIndexName(indexName);
Map<String, String> params = Collections.singletonMap("conflicts", "proceed");
String jsonString = "{" + DeleteByQueryRequest request = new DeleteByQueryRequest();
" \"query\": {" + request.indices(indexName);
" \"range\": {" + request.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
" \"" + timeBucketColumnName + "\": {" +
" \"lte\": " + endTimeBucket + BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);
" }" + logger.debug("Delete data from index {}, deleted {}", indexName, response.getDeleted());
" }" + return 200;
" }" +
"}";
HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
return response.getStatusLine().getStatusCode();
} }
public String formatIndexName(String indexName) { public String formatIndexName(String indexName) {
...@@ -235,22 +232,30 @@ public class ElasticSearchClient implements Client { ...@@ -235,22 +232,30 @@ public class ElasticSearchClient implements Client {
BulkProcessor.Listener listener = new BulkProcessor.Listener() { BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override @Override
public void beforeBulk(long executionId, BulkRequest request) { public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
} }
@Override @Override
public void afterBulk(long executionId, BulkRequest request, public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
BulkResponse response) { if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
} }
@Override @Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure); logger.error("Failed to execute bulk", failure);
} }
}; };
return BulkProcessor.builder(client::bulkAsync, listener) BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
return BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(bulkActions) .setBulkActions(bulkActions)
.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB)) .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval)) .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册