未验证 提交 5adbc1f9 编写于 作者: 冉小龙 提交者: GitHub

[Security] Upgrade the snakeyaml verion to 1.26 (#7994)

Fixes #7928

### Motivation

As https://nvd.nist.gov/vuln/detail/CVE-2017-18640 said, the `snakeyaml` < 1.26

### Modifications

In `pulsar-functions` model:

- The `snakeyaml` 1.19 appears to be included from dependency on org.apache.pulsar:pulsar-functions-secrets:jar:2.6.1 based on included dependency of io.kubernetes:client-java-api:jar:2.0.0:compile Fixed in 9.0.2

- The `snakeyaml` 1.16 appears to be included from the dependency on org.apache.pulsar:pulsar-functions-instance:jar:2.6.1 based on io.prometheus.jmx:collector:jar:0.12.0 Fixed in 0.13.0

- The 1.17 org.apache.pulsar.tests:integration:test-jar:tests:2.6.1:test depends on org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:6.3.2:test Fixed in elasticsearch >= 7.7.1 (7.9.1 current)
上级 8f2540fc
......@@ -322,7 +322,16 @@ The Apache Software License, Version 2.0
- com.fasterxml.jackson.module-jackson-module-jsonSchema-2.11.1.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.6.2.jar
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-1.12.0.jar
* Gson -- com.google.code.gson-gson-2.8.2.jar
* Joda -- org.joda-joda-convert-2.2.1.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.7.2.jar
* Gson
- com.google.code.gson-gson-2.8.2.jar
- io.gsonfire-gson-fire-1.8.4.jar
* Sundrio
- io.sundr-builder-annotations-0.21.0.jar
- io.sundr-resourcecify-annotations-0.21.0.jar
- io.sundr-sundr-codegen-0.21.0.jar
- io.sundr-sundr-core-0.21.0.jar
* Guava
- com.google.guava-guava-25.1-jre.jar
* J2ObjC Annotations -- com.google.j2objc-j2objc-annotations-1.1.jar
......@@ -371,7 +380,7 @@ The Apache Software License, Version 2.0
- io.prometheus-simpleclient_servlet-0.5.0.jar
- io.prometheus-simpleclient_log4j2-0.5.0.jar
- io.prometheus-simpleclient_jetty-0.5.0.jar
- io.prometheus.jmx-collector-0.12.0.jar
- io.prometheus.jmx-collector-0.14.0.jar
- io.prometheus-simpleclient_caffeine-0.5.0.jar
* Jakarta Bean Validation API
- jakarta.validation-jakarta.validation-api-2.0.2.jar
......@@ -468,12 +477,12 @@ The Apache Software License, Version 2.0
* @FreeBuilder
- org.inferred-freebuilder-1.14.9.jar
* Squareup
- com.squareup.okhttp-logging-interceptor-2.7.5.jar
- com.squareup.okhttp-okhttp-ws-2.7.5.jar
- com.squareup.okhttp3-logging-interceptor-3.14.3.jar
- com.squareup.okhttp3-okhttp-3.14.3.jar
* Kubernetes Client
- io.kubernetes-client-java-2.0.0.jar
- io.kubernetes-client-java-api-2.0.0.jar
- io.kubernetes-client-java-proto-2.0.0.jar
- io.kubernetes-client-java-9.0.2.jar
- io.kubernetes-client-java-api-9.0.2.jar
- io.kubernetes-client-java-proto-9.0.2.jar
* Joda Time
- joda-time-2.10.1.jar
- joda-time-joda-time-2.10.1.jar
......
......@@ -144,7 +144,7 @@ flexible messaging model and an intuitive client API.</description>
<mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
<hdfs-offload-version3>3.2.0</hdfs-offload-version3>
<org.eclipse.jetty-hdfs-offload>9.3.24.v20180605</org.eclipse.jetty-hdfs-offload>
<elasticsearch.version>6.3.2</elasticsearch.version>
<elasticsearch.version>7.9.1</elasticsearch.version>
<presto.version>332</presto.version>
<flink.version>1.6.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
......@@ -154,7 +154,7 @@ flexible messaging model and an intuitive client API.</description>
<hbase.version>2.3.0</hbase.version>
<guava.version>25.1-jre</guava.version>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.12.0</prometheus-jmx.version>
<prometheus-jmx.version>0.14.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
<aircompressor.version>0.16</aircompressor.version>
......
......@@ -63,7 +63,7 @@
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>2.0.0</version>
<version>9.0.2</version>
<scope>compile</scope>
<exclusions>
<exclusion>
......
......@@ -18,8 +18,8 @@
*/
package org.apache.pulsar.functions.auth;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.common.util.Reflections;
......
......@@ -19,16 +19,16 @@
package org.apache.pulsar.functions.auth;
import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1DeleteOptions;
import io.kubernetes.client.models.V1ObjectMeta;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1Secret;
import io.kubernetes.client.models.V1SecretVolumeSource;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1Volume;
import io.kubernetes.client.models.V1VolumeMount;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1Secret;
import io.kubernetes.client.openapi.models.V1SecretVolumeSource;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1Volume;
import io.kubernetes.client.openapi.models.V1VolumeMount;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
......@@ -173,16 +173,12 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
try {
V1DeleteOptions v1DeleteOptions = new V1DeleteOptions();
v1DeleteOptions.setGracePeriodSeconds(0L);
v1DeleteOptions.setPropagationPolicy("Foreground");
// make sure secretName is not null or empty string.
// If deleteNamespacedSecret is called and secret name is null or empty string
// it will delete all the secrets in the namespace
coreClient.deleteNamespacedSecret(secretName,
kubeNamespace, v1DeleteOptions, "true",
null, null, null);
kubeNamespace, null, "true",
0, null, "Foreground", null);
} catch (ApiException e) {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
......@@ -302,11 +298,11 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
.data(buildSecretMap(token));
try {
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null);
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, "true", null);
} catch (ApiException e) {
if (e.getCode() == HTTP_CONFLICT) {
try {
coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret, null);
coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret, null, "true", null);
return Actions.ActionResult.builder().success(true).build();
} catch (ApiException e1) {
......@@ -358,7 +354,7 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
.metadata(new V1ObjectMeta().name(getSecretName(id)))
.data(buildSecretMap(token));
try {
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, "true");
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, "true", null);
} catch (ApiException e) {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
......
......@@ -20,7 +20,7 @@ package org.apache.pulsar.functions.runtime.kubernetes;
import com.google.gson.Gson;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.models.*;
import io.kubernetes.client.openapi.models.*;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
......
......@@ -18,8 +18,8 @@
*/
package org.apache.pulsar.functions.runtime.kubernetes;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
......
......@@ -26,33 +26,34 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.protobuf.Empty;
import com.squareup.okhttp.Response;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.models.V1Container;
import io.kubernetes.client.models.V1ContainerPort;
import io.kubernetes.client.models.V1DeleteOptions;
import io.kubernetes.client.models.V1EnvVar;
import io.kubernetes.client.models.V1EnvVarSource;
import io.kubernetes.client.models.V1LabelSelector;
import io.kubernetes.client.models.V1ObjectFieldSelector;
import io.kubernetes.client.models.V1ObjectMeta;
import io.kubernetes.client.models.V1PodList;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1PodTemplateSpec;
import io.kubernetes.client.models.V1ResourceRequirements;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1ServicePort;
import io.kubernetes.client.models.V1ServiceSpec;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.models.V1Toleration;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1ContainerPort;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1EnvVar;
import io.kubernetes.client.openapi.models.V1EnvVarSource;
import io.kubernetes.client.openapi.models.V1LabelSelector;
import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServicePort;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
import io.kubernetes.client.openapi.models.V1Toleration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
......@@ -85,7 +86,6 @@ import java.util.regex.Pattern;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
......@@ -422,7 +422,7 @@ public class KubernetesRuntime implements Runtime {
.supplier(() -> {
final V1Service response;
try {
response = coreClient.createNamespacedService(jobNamespace, service, null);
response = coreClient.createNamespacedService(jobNamespace, service, null, "true", null);
} catch (ApiException e) {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
......@@ -507,7 +507,7 @@ public class KubernetesRuntime implements Runtime {
.supplier(() -> {
final V1StatefulSet response;
try {
response = appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet, null);
response = appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet, null, "true", null);
} catch (ApiException e) {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
......@@ -558,8 +558,8 @@ public class KubernetesRuntime implements Runtime {
// https://github.com/kubernetes-client/java/issues/86
response = appsClient.deleteNamespacedStatefulSetCall(
statefulSetName,
jobNamespace, options, null,
null, null, null,
jobNamespace, null, "true",
5, null, "Foreground",
null, null)
.execute();
} catch (ApiException e) {
......@@ -709,9 +709,9 @@ public class KubernetesRuntime implements Runtime {
// https://github.com/kubernetes-client/java/issues/86
response = coreClient.deleteNamespacedServiceCall(
serviceName,
jobNamespace, options, null,
null, null,
null, null, null).execute();
jobNamespace, null, "true",
0, null,
"Foreground", null, null).execute();
} catch (ApiException e) {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
......
......@@ -20,11 +20,11 @@
package org.apache.pulsar.functions.runtime.kubernetes;
import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.util.Config;
import java.nio.file.Paths;
......
......@@ -23,14 +23,14 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1Container;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1PodTemplateSpec;
import io.kubernetes.client.models.V1Secret;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
import io.kubernetes.client.openapi.models.V1Secret;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
import java.util.Collections;
import java.util.Optional;
......@@ -103,7 +103,7 @@ public class KubernetesSecretsTokenAuthProviderTest {
@Test
public void testCacheAuthData() throws ApiException {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString());
doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString(), anyString(), anyString());
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, null, (fd) -> "default");
Function.FunctionDetails funcDetails = Function.FunctionDetails.newBuilder().setTenant("test-tenant").setNamespace("test-ns").setName("test-func").build();
......
......@@ -19,11 +19,11 @@
package org.apache.pulsar.functions.runtime.kubernetes;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.functions.Resources;
......
......@@ -22,10 +22,10 @@ package org.apache.pulsar.functions.runtime.kubernetes;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.protobuf.util.JsonFormat;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.models.*;
import io.kubernetes.client.openapi.models.*;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.InstanceConfig;
......
......@@ -37,9 +37,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1PodSpec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
......
......@@ -34,7 +34,7 @@
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>2.0.0</version>
<version>9.0.2</version>
<scope>compile</scope>
<exclusions>
<exclusion>
......
......@@ -19,7 +19,7 @@
package org.apache.pulsar.functions.secretsproviderconfigurator;
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodSpec;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
......
......@@ -20,9 +20,13 @@ package org.apache.pulsar.functions.secretsproviderconfigurator;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.*;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1EnvVar;
import io.kubernetes.client.openapi.models.V1EnvVarSource;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1SecretKeySelector;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
......
......@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.functions.secretsproviderconfigurator;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1PodSpec;
import org.apache.pulsar.functions.proto.Function;
import java.lang.reflect.Type;
......
......@@ -41,10 +41,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
......@@ -86,7 +83,7 @@ public class ElasticSearchSink implements Sink<byte[]> {
indexRequest.source(keyValue.getValue(), XContentType.JSON);
try {
IndexResponse indexResponse = getClient().index(indexRequest);
IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT);
if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
record.ack();
} else {
......@@ -105,7 +102,7 @@ public class ElasticSearchSink implements Sink<byte[]> {
private void createIndexIfNeeded() throws IOException {
GetIndexRequest request = new GetIndexRequest();
request.indices(elasticSearchConfig.getIndexName());
boolean exists = getClient().indices().exists(request);
boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT);
if (!exists) {
CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName());
......@@ -114,7 +111,7 @@ public class ElasticSearchSink implements Sink<byte[]> {
.put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards())
.put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas()));
CreateIndexResponse ciresp = getClient().indices().create(cireq);
CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT);
if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
throw new RuntimeException("Unable to create index.");
}
......
......@@ -28,6 +28,7 @@ import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
......@@ -65,8 +66,8 @@ public class ElasticSearchSinkTester extends SinkTester<ElasticSearchContainer>
searchRequest.types("doc");
try {
SearchResponse searchResult = elasticClient.search(searchRequest);
assertTrue(searchResult.getHits().getTotalHits() > 0, searchResult.toString());
SearchResponse searchResult = elasticClient.search(searchRequest, RequestOptions.DEFAULT);
assertTrue(searchResult.getHits().getTotalHits().value > 0, searchResult.toString());
} catch (Exception e) {
fail("Encountered exception on validating elastic search results", e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册