diff --git a/oap-server/pom.xml b/oap-server/pom.xml index dba03700a23f5b1f06224d50f3abc8abfbee42a8..f663e2d8bbd17fd9349985a38d67060d41665488 100644 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -62,7 +62,7 @@ 2.0.3 1.4 2.6 - 6.3.2 + 6.7.0 2.9.9 2.0.0 3.1.0 diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java index 0e65d720553ec2c7791a014a0e9e1e12f1761221..1d57a0da5504a759ae52d8e12b588eebd62de7df 100644 --- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java @@ -18,53 +18,35 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; +import java.io.IOException; +import java.util.*; +import java.util.function.BiConsumer; import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpEntity; import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.auth.*; import org.apache.http.client.CredentialsProvider; -import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; -import org.apache.http.nio.entity.NStringEntity; import org.apache.skywalking.oap.server.library.client.Client; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.get.MultiGetRequest; -import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.bulk.*; +import org.elasticsearch.action.get.*; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.*; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.unit.*; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.*; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import org.slf4j.*; /** * @author peng-yongsheng @@ -94,13 +76,7 @@ public class ElasticSearchClient implements Client { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); builder = RestClient.builder(pairsList.toArray(new HttpHost[0])) - .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { - @Override - public HttpAsyncClientBuilder customizeHttpClient( - HttpAsyncClientBuilder httpClientBuilder) { - return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - } - }); + .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } else { builder = RestClient.builder(pairsList.toArray(new HttpHost[0])); } @@ -133,8 +109,8 @@ public class ElasticSearchClient implements Client { indexName = formatIndexName(indexName); CreateIndexRequest request = new CreateIndexRequest(indexName); request.settings(settings); - request.mapping(TYPE, mappingBuilder); - CreateIndexResponse response = client.indices().create(request); + request.mapping(mappingBuilder); + CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } @@ -142,17 +118,44 @@ public class ElasticSearchClient implements Client { public boolean deleteIndex(String indexName) throws IOException { indexName = formatIndexName(indexName); DeleteIndexRequest request = new DeleteIndexRequest(indexName); - DeleteIndexResponse response; - response = client.indices().delete(request); + AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT); logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } public boolean isExistsIndex(String indexName) throws IOException { indexName = formatIndexName(indexName); - GetIndexRequest request = new GetIndexRequest(); - request.indices(indexName); - return client.indices().exists(request); + GetIndexRequest request = new GetIndexRequest(indexName); + return client.indices().exists(request, RequestOptions.DEFAULT); + } + + public boolean isExistsTemplate(String indexName) throws IOException { + indexName = formatIndexName(indexName); + IndexTemplatesExistRequest request = new IndexTemplatesExistRequest(indexName); + return client.indices().existsTemplate(request, RequestOptions.DEFAULT); + } + + public boolean createTemplate(String indexName, Settings settings, + XContentBuilder mappingBuilder) throws IOException { + indexName = formatIndexName(indexName); + + org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest request = new PutIndexTemplateRequest(indexName); + request.patterns(Collections.singletonList(indexName + "*")); + request.settings(settings); + request.mapping("_doc", mappingBuilder); + + AcknowledgedResponse response = client.indices().putTemplate(request, RequestOptions.DEFAULT); + logger.debug("create {} template finished, isAcknowledged: {}", indexName, response.isAcknowledged()); + return response.isAcknowledged(); + } + + public boolean deleteTemplate(String indexName) throws IOException { + indexName = formatIndexName(indexName); + + DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(); + request.name(indexName); + AcknowledgedResponse deleteTemplateAcknowledge = client.indices().deleteTemplate(request, RequestOptions.DEFAULT); + return deleteTemplateAcknowledge.isAcknowledged(); } public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException { @@ -160,39 +163,39 @@ public class ElasticSearchClient implements Client { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.types(TYPE); searchRequest.source(searchSourceBuilder); - return client.search(searchRequest); + return client.search(searchRequest, RequestOptions.DEFAULT); } public GetResponse get(String indexName, String id) throws IOException { indexName = formatIndexName(indexName); GetRequest request = new GetRequest(indexName, TYPE, id); - return client.get(request); + return client.get(request, RequestOptions.DEFAULT); } public MultiGetResponse multiGet(String indexName, List ids) throws IOException { final String newIndexName = formatIndexName(indexName); MultiGetRequest request = new MultiGetRequest(); ids.forEach(id -> request.add(newIndexName, TYPE, id)); - return client.multiGet(request); + return client.mget(request, RequestOptions.DEFAULT); } public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException { IndexRequest request = prepareInsert(indexName, id, source); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.index(request); + client.index(request, RequestOptions.DEFAULT); } public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException { UpdateRequest request = prepareUpdate(indexName, id, source); request.version(version); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.update(request); + client.update(request, RequestOptions.DEFAULT); } public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException { UpdateRequest request = prepareUpdate(indexName, id, source); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.update(request); + client.update(request, RequestOptions.DEFAULT); } public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) { @@ -207,20 +210,14 @@ public class ElasticSearchClient implements Client { public int delete(String indexName, String timeBucketColumnName, long endTimeBucket) throws IOException { indexName = formatIndexName(indexName); - Map params = Collections.singletonMap("conflicts", "proceed"); - String jsonString = "{" + - " \"query\": {" + - " \"range\": {" + - " \"" + timeBucketColumnName + "\": {" + - " \"lte\": " + endTimeBucket + - " }" + - " }" + - " }" + - "}"; - HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON); - Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity); - logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString); - return response.getStatusLine().getStatusCode(); + + DeleteByQueryRequest request = new DeleteByQueryRequest(); + request.indices(indexName); + request.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket)); + + BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT); + logger.debug("Delete data from index {}, deleted {}", indexName, response.getDeleted()); + return 200; } public String formatIndexName(String indexName) { @@ -235,22 +232,30 @@ public class ElasticSearchClient implements Client { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { - + int numberOfActions = request.numberOfActions(); + logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override - public void afterBulk(long executionId, BulkRequest request, - BulkResponse response) { - + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + logger.warn("Bulk [{}] executed with failures", executionId); + } else { + logger.debug("Bulk [{}] completed in {} milliseconds", + executionId, response.getTook().getMillis()); + } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure); + logger.error("Failed to execute bulk", failure); } }; - return BulkProcessor.builder(client::bulkAsync, listener) + BiConsumer> bulkConsumer = + (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); + + return BulkProcessor.builder(bulkConsumer, listener) .setBulkActions(bulkActions) .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))