From e6a7720f0357f2cff9077a70e4c135d704feb2b6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com>
Date: Sun, 31 Mar 2019 04:19:10 +0800
Subject: [PATCH] Provide create, delete, isExists template method by elastic
client. (#2425)
---
oap-server/pom.xml | 2 +-
.../elasticsearch/ElasticSearchClient.java | 155 +++++++++---------
2 files changed, 81 insertions(+), 76 deletions(-)
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index dba03700a2..f663e2d8bb 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -62,7 +62,7 @@
2.0.3
1.4
2.6
- 6.3.2
+ 6.7.0
2.9.9
2.0.0
3.1.0
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 0e65d72055..1d57a0da55 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -18,53 +18,35 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
+import java.io.IOException;
+import java.util.*;
+import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.auth.*;
import org.apache.http.client.CredentialsProvider;
-import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.nio.entity.NStringEntity;
import org.apache.skywalking.oap.server.library.client.Client;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetRequest;
-import org.elasticsearch.action.get.MultiGetResponse;
+import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.unit.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.*;
import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -94,13 +76,7 @@ public class ElasticSearchClient implements Client {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
- .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
- @Override
- public HttpAsyncClientBuilder customizeHttpClient(
- HttpAsyncClientBuilder httpClientBuilder) {
- return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
- });
+ .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
} else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
}
@@ -133,8 +109,8 @@ public class ElasticSearchClient implements Client {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings);
- request.mapping(TYPE, mappingBuilder);
- CreateIndexResponse response = client.indices().create(request);
+ request.mapping(mappingBuilder);
+ CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
@@ -142,17 +118,44 @@ public class ElasticSearchClient implements Client {
public boolean deleteIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
- DeleteIndexResponse response;
- response = client.indices().delete(request);
+ AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean isExistsIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
- GetIndexRequest request = new GetIndexRequest();
- request.indices(indexName);
- return client.indices().exists(request);
+ GetIndexRequest request = new GetIndexRequest(indexName);
+ return client.indices().exists(request, RequestOptions.DEFAULT);
+ }
+
+ public boolean isExistsTemplate(String indexName) throws IOException {
+ indexName = formatIndexName(indexName);
+ IndexTemplatesExistRequest request = new IndexTemplatesExistRequest(indexName);
+ return client.indices().existsTemplate(request, RequestOptions.DEFAULT);
+ }
+
+ public boolean createTemplate(String indexName, Settings settings,
+ XContentBuilder mappingBuilder) throws IOException {
+ indexName = formatIndexName(indexName);
+
+ org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest request = new PutIndexTemplateRequest(indexName);
+ request.patterns(Collections.singletonList(indexName + "*"));
+ request.settings(settings);
+ request.mapping("_doc", mappingBuilder);
+
+ AcknowledgedResponse response = client.indices().putTemplate(request, RequestOptions.DEFAULT);
+ logger.debug("create {} template finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+ return response.isAcknowledged();
+ }
+
+ public boolean deleteTemplate(String indexName) throws IOException {
+ indexName = formatIndexName(indexName);
+
+ DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest();
+ request.name(indexName);
+ AcknowledgedResponse deleteTemplateAcknowledge = client.indices().deleteTemplate(request, RequestOptions.DEFAULT);
+ return deleteTemplateAcknowledge.isAcknowledged();
}
public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
@@ -160,39 +163,39 @@ public class ElasticSearchClient implements Client {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source(searchSourceBuilder);
- return client.search(searchRequest);
+ return client.search(searchRequest, RequestOptions.DEFAULT);
}
public GetResponse get(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
GetRequest request = new GetRequest(indexName, TYPE, id);
- return client.get(request);
+ return client.get(request, RequestOptions.DEFAULT);
}
public MultiGetResponse multiGet(String indexName, List ids) throws IOException {
final String newIndexName = formatIndexName(indexName);
MultiGetRequest request = new MultiGetRequest();
ids.forEach(id -> request.add(newIndexName, TYPE, id));
- return client.multiGet(request);
+ return client.mget(request, RequestOptions.DEFAULT);
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
IndexRequest request = prepareInsert(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.index(request);
+ client.index(request, RequestOptions.DEFAULT);
}
public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
UpdateRequest request = prepareUpdate(indexName, id, source);
request.version(version);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.update(request);
+ client.update(request, RequestOptions.DEFAULT);
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
UpdateRequest request = prepareUpdate(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.update(request);
+ client.update(request, RequestOptions.DEFAULT);
}
public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
@@ -207,20 +210,14 @@ public class ElasticSearchClient implements Client {
public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException {
indexName = formatIndexName(indexName);
- Map params = Collections.singletonMap("conflicts", "proceed");
- String jsonString = "{" +
- " \"query\": {" +
- " \"range\": {" +
- " \"" + timeBucketColumnName + "\": {" +
- " \"lte\": " + endTimeBucket +
- " }" +
- " }" +
- " }" +
- "}";
- HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
- Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
- logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
- return response.getStatusLine().getStatusCode();
+
+ DeleteByQueryRequest request = new DeleteByQueryRequest();
+ request.indices(indexName);
+ request.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
+
+ BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);
+ logger.debug("Delete data from index {}, deleted {}", indexName, response.getDeleted());
+ return 200;
}
public String formatIndexName(String indexName) {
@@ -235,22 +232,30 @@ public class ElasticSearchClient implements Client {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
-
+ int numberOfActions = request.numberOfActions();
+ logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
- public void afterBulk(long executionId, BulkRequest request,
- BulkResponse response) {
-
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ if (response.hasFailures()) {
+ logger.warn("Bulk [{}] executed with failures", executionId);
+ } else {
+ logger.debug("Bulk [{}] completed in {} milliseconds",
+ executionId, response.getTook().getMillis());
+ }
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
+ logger.error("Failed to execute bulk", failure);
}
};
- return BulkProcessor.builder(client::bulkAsync, listener)
+ BiConsumer> bulkConsumer =
+ (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
+
+ return BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(bulkActions)
.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
--
GitLab