From 5adbc1f99958a09fc1cd5191fbe2ff76a70b4c85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Tue, 8 Sep 2020 09:05:23 +0800 Subject: [PATCH] [Security] Upgrade the snakeyaml verion to 1.26 (#7994) Fixes #7928 ### Motivation As https://nvd.nist.gov/vuln/detail/CVE-2017-18640 said, the `snakeyaml` < 1.26 ### Modifications In `pulsar-functions` model: - The `snakeyaml` 1.19 appears to be included from dependency on org.apache.pulsar:pulsar-functions-secrets:jar:2.6.1 based on included dependency of io.kubernetes:client-java-api:jar:2.0.0:compile Fixed in 9.0.2 - The `snakeyaml` 1.16 appears to be included from the dependency on org.apache.pulsar:pulsar-functions-instance:jar:2.6.1 based on io.prometheus.jmx:collector:jar:0.12.0 Fixed in 0.13.0 - The 1.17 org.apache.pulsar.tests:integration:test-jar:tests:2.6.1:test depends on org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:6.3.2:test Fixed in elasticsearch >= 7.7.1 (7.9.1 current) --- .../server/src/assemble/LICENSE.bin.txt | 23 ++++--- pom.xml | 4 +- pulsar-functions/runtime/pom.xml | 2 +- .../auth/KubernetesFunctionAuthProvider.java | 4 +- .../KubernetesSecretsTokenAuthProvider.java | 34 +++++------ .../BasicKubernetesManifestCustomizer.java | 2 +- .../KubernetesManifestCustomizer.java | 4 +- .../runtime/kubernetes/KubernetesRuntime.java | 60 +++++++++---------- .../kubernetes/KubernetesRuntimeFactory.java | 10 ++-- ...ubernetesSecretsTokenAuthProviderTest.java | 18 +++--- .../KubernetesRuntimeFactoryTest.java | 10 ++-- .../kubernetes/KubernetesRuntimeTest.java | 6 +- .../runtime/process/ProcessRuntimeTest.java | 6 +- pulsar-functions/secrets/pom.xml | 2 +- .../DefaultSecretsProviderConfigurator.java | 2 +- ...KubernetesSecretsProviderConfigurator.java | 10 +++- .../SecretsProviderConfigurator.java | 6 +- .../io/elasticsearch/ElasticSearchSink.java | 11 ++-- .../io/ElasticSearchSinkTester.java | 5 +- 19 files changed, 113 insertions(+), 106 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index e08449f137e..0ffe2ce4c03 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -322,7 +322,16 @@ The Apache Software License, Version 2.0 - com.fasterxml.jackson.module-jackson-module-jsonSchema-2.11.1.jar * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.6.2.jar * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-1.12.0.jar - * Gson -- com.google.code.gson-gson-2.8.2.jar + * Joda -- org.joda-joda-convert-2.2.1.jar + * Bitbucket -- org.bitbucket.b_c-jose4j-0.7.2.jar + * Gson + - com.google.code.gson-gson-2.8.2.jar + - io.gsonfire-gson-fire-1.8.4.jar + * Sundrio + - io.sundr-builder-annotations-0.21.0.jar + - io.sundr-resourcecify-annotations-0.21.0.jar + - io.sundr-sundr-codegen-0.21.0.jar + - io.sundr-sundr-core-0.21.0.jar * Guava - com.google.guava-guava-25.1-jre.jar * J2ObjC Annotations -- com.google.j2objc-j2objc-annotations-1.1.jar @@ -371,7 +380,7 @@ The Apache Software License, Version 2.0 - io.prometheus-simpleclient_servlet-0.5.0.jar - io.prometheus-simpleclient_log4j2-0.5.0.jar - io.prometheus-simpleclient_jetty-0.5.0.jar - - io.prometheus.jmx-collector-0.12.0.jar + - io.prometheus.jmx-collector-0.14.0.jar - io.prometheus-simpleclient_caffeine-0.5.0.jar * Jakarta Bean Validation API - jakarta.validation-jakarta.validation-api-2.0.2.jar @@ -468,12 +477,12 @@ The Apache Software License, Version 2.0 * @FreeBuilder - org.inferred-freebuilder-1.14.9.jar * Squareup - - com.squareup.okhttp-logging-interceptor-2.7.5.jar - - com.squareup.okhttp-okhttp-ws-2.7.5.jar + - com.squareup.okhttp3-logging-interceptor-3.14.3.jar + - com.squareup.okhttp3-okhttp-3.14.3.jar * Kubernetes Client - - io.kubernetes-client-java-2.0.0.jar - - io.kubernetes-client-java-api-2.0.0.jar - - io.kubernetes-client-java-proto-2.0.0.jar + - io.kubernetes-client-java-9.0.2.jar + - io.kubernetes-client-java-api-9.0.2.jar + - io.kubernetes-client-java-proto-9.0.2.jar * Joda Time - joda-time-2.10.1.jar - joda-time-joda-time-2.10.1.jar diff --git a/pom.xml b/pom.xml index fa12e93ca04..f9e2ad94d5b 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,7 @@ flexible messaging model and an intuitive client API. 2.6.0 3.2.0 9.3.24.v20180605 - 6.3.2 + 7.9.1 332 1.6.0 2.11 @@ -154,7 +154,7 @@ flexible messaging model and an intuitive client API. 2.3.0 25.1-jre 1.0 - 0.12.0 + 0.14.0 5.3.2 1.9.13 0.16 diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml index 2c71852e8ec..cf8f09f7f0b 100644 --- a/pulsar-functions/runtime/pom.xml +++ b/pulsar-functions/runtime/pom.xml @@ -63,7 +63,7 @@ io.kubernetes client-java - 2.0.0 + 9.0.2 compile diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java index dacd3a1f3a5..912f0d38d4b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.functions.auth; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.V1StatefulSet; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1StatefulSet; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.common.util.Reflections; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java index 7119c744325..98b1cfaa40d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java @@ -19,16 +19,16 @@ package org.apache.pulsar.functions.auth; import com.google.common.annotations.VisibleForTesting; -import io.kubernetes.client.ApiException; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.V1DeleteOptions; -import io.kubernetes.client.models.V1ObjectMeta; -import io.kubernetes.client.models.V1PodSpec; -import io.kubernetes.client.models.V1Secret; -import io.kubernetes.client.models.V1SecretVolumeSource; -import io.kubernetes.client.models.V1StatefulSet; -import io.kubernetes.client.models.V1Volume; -import io.kubernetes.client.models.V1VolumeMount; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1DeleteOptions; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1Secret; +import io.kubernetes.client.openapi.models.V1SecretVolumeSource; +import io.kubernetes.client.openapi.models.V1StatefulSet; +import io.kubernetes.client.openapi.models.V1Volume; +import io.kubernetes.client.openapi.models.V1VolumeMount; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -173,16 +173,12 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS) .supplier(() -> { try { - V1DeleteOptions v1DeleteOptions = new V1DeleteOptions(); - v1DeleteOptions.setGracePeriodSeconds(0L); - v1DeleteOptions.setPropagationPolicy("Foreground"); - // make sure secretName is not null or empty string. // If deleteNamespacedSecret is called and secret name is null or empty string // it will delete all the secrets in the namespace coreClient.deleteNamespacedSecret(secretName, - kubeNamespace, v1DeleteOptions, "true", - null, null, null); + kubeNamespace, null, "true", + 0, null, "Foreground", null); } catch (ApiException e) { // if already deleted if (e.getCode() == HTTP_NOT_FOUND) { @@ -302,11 +298,11 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut .data(buildSecretMap(token)); try { - coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null); + coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, "true", null); } catch (ApiException e) { if (e.getCode() == HTTP_CONFLICT) { try { - coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret, null); + coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret, null, "true", null); return Actions.ActionResult.builder().success(true).build(); } catch (ApiException e1) { @@ -358,7 +354,7 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut .metadata(new V1ObjectMeta().name(getSecretName(id))) .data(buildSecretMap(token)); try { - coreClient.createNamespacedSecret(kubeNamespace, v1Secret, "true"); + coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, "true", null); } catch (ApiException e) { // already exists if (e.getCode() == HTTP_CONFLICT) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java index 714d1d5a1c5..dc14b6ab34f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java @@ -20,7 +20,7 @@ package org.apache.pulsar.functions.runtime.kubernetes; import com.google.gson.Gson; import io.kubernetes.client.custom.Quantity; -import io.kubernetes.client.models.*; +import io.kubernetes.client.openapi.models.*; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java index b389cf6733b..fb69dfac74d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.functions.runtime.kubernetes; -import io.kubernetes.client.models.V1Service; -import io.kubernetes.client.models.V1StatefulSet; +import io.kubernetes.client.openapi.models.V1Service; +import io.kubernetes.client.openapi.models.V1StatefulSet; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.runtime.RuntimeCustomizer; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index 844379106bb..cbb5f9b5963 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -26,33 +26,34 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.protobuf.Empty; -import com.squareup.okhttp.Response; + import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.kubernetes.client.ApiException; -import io.kubernetes.client.apis.AppsV1Api; -import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.custom.Quantity; -import io.kubernetes.client.models.V1Container; -import io.kubernetes.client.models.V1ContainerPort; -import io.kubernetes.client.models.V1DeleteOptions; -import io.kubernetes.client.models.V1EnvVar; -import io.kubernetes.client.models.V1EnvVarSource; -import io.kubernetes.client.models.V1LabelSelector; -import io.kubernetes.client.models.V1ObjectFieldSelector; -import io.kubernetes.client.models.V1ObjectMeta; -import io.kubernetes.client.models.V1PodList; -import io.kubernetes.client.models.V1PodSpec; -import io.kubernetes.client.models.V1PodTemplateSpec; -import io.kubernetes.client.models.V1ResourceRequirements; -import io.kubernetes.client.models.V1Service; -import io.kubernetes.client.models.V1ServicePort; -import io.kubernetes.client.models.V1ServiceSpec; -import io.kubernetes.client.models.V1StatefulSet; -import io.kubernetes.client.models.V1StatefulSetSpec; -import io.kubernetes.client.models.V1Toleration; +import io.kubernetes.client.openapi.models.V1Container; +import io.kubernetes.client.openapi.models.V1ContainerPort; +import io.kubernetes.client.openapi.models.V1DeleteOptions; +import io.kubernetes.client.openapi.models.V1EnvVar; +import io.kubernetes.client.openapi.models.V1EnvVarSource; +import io.kubernetes.client.openapi.models.V1LabelSelector; +import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1PodTemplateSpec; +import io.kubernetes.client.openapi.models.V1ResourceRequirements; +import io.kubernetes.client.openapi.models.V1Service; +import io.kubernetes.client.openapi.models.V1ServicePort; +import io.kubernetes.client.openapi.models.V1ServiceSpec; +import io.kubernetes.client.openapi.models.V1StatefulSet; +import io.kubernetes.client.openapi.models.V1StatefulSetSpec; +import io.kubernetes.client.openapi.models.V1Toleration; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import okhttp3.Response; import org.apache.commons.codec.digest.DigestUtils; import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider; import org.apache.pulsar.functions.instance.AuthenticationConfig; @@ -85,7 +86,6 @@ import java.util.regex.Pattern; import static java.net.HttpURLConnection.HTTP_CONFLICT; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; -import static org.apache.commons.lang3.StringUtils.defaultIfBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal; @@ -422,7 +422,7 @@ public class KubernetesRuntime implements Runtime { .supplier(() -> { final V1Service response; try { - response = coreClient.createNamespacedService(jobNamespace, service, null); + response = coreClient.createNamespacedService(jobNamespace, service, null, "true", null); } catch (ApiException e) { // already exists if (e.getCode() == HTTP_CONFLICT) { @@ -507,7 +507,7 @@ public class KubernetesRuntime implements Runtime { .supplier(() -> { final V1StatefulSet response; try { - response = appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet, null); + response = appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet, null, "true", null); } catch (ApiException e) { // already exists if (e.getCode() == HTTP_CONFLICT) { @@ -558,8 +558,8 @@ public class KubernetesRuntime implements Runtime { // https://github.com/kubernetes-client/java/issues/86 response = appsClient.deleteNamespacedStatefulSetCall( statefulSetName, - jobNamespace, options, null, - null, null, null, + jobNamespace, null, "true", + 5, null, "Foreground", null, null) .execute(); } catch (ApiException e) { @@ -709,9 +709,9 @@ public class KubernetesRuntime implements Runtime { // https://github.com/kubernetes-client/java/issues/86 response = coreClient.deleteNamespacedServiceCall( serviceName, - jobNamespace, options, null, - null, null, - null, null, null).execute(); + jobNamespace, null, "true", + 0, null, + "Foreground", null, null).execute(); } catch (ApiException e) { // if already deleted if (e.getCode() == HTTP_NOT_FOUND) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index 58af72d8304..5a0a54a34b6 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -20,11 +20,11 @@ package org.apache.pulsar.functions.runtime.kubernetes; import com.google.common.annotations.VisibleForTesting; -import io.kubernetes.client.ApiClient; -import io.kubernetes.client.Configuration; -import io.kubernetes.client.apis.AppsV1Api; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.V1ConfigMap; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.Configuration; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ConfigMap; import io.kubernetes.client.util.Config; import java.nio.file.Paths; diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java index 074904cf7dd..38dd91426b4 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java @@ -23,14 +23,14 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import io.kubernetes.client.ApiException; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.V1Container; -import io.kubernetes.client.models.V1PodSpec; -import io.kubernetes.client.models.V1PodTemplateSpec; -import io.kubernetes.client.models.V1Secret; -import io.kubernetes.client.models.V1StatefulSet; -import io.kubernetes.client.models.V1StatefulSetSpec; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Container; +import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1PodTemplateSpec; +import io.kubernetes.client.openapi.models.V1Secret; +import io.kubernetes.client.openapi.models.V1StatefulSet; +import io.kubernetes.client.openapi.models.V1StatefulSetSpec; import java.util.Collections; import java.util.Optional; @@ -103,7 +103,7 @@ public class KubernetesSecretsTokenAuthProviderTest { @Test public void testCacheAuthData() throws ApiException { CoreV1Api coreV1Api = mock(CoreV1Api.class); - doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString()); + doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString(), anyString(), anyString()); KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(); kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, null, (fd) -> "default"); Function.FunctionDetails funcDetails = Function.FunctionDetails.newBuilder().setTenant("test-tenant").setNamespace("test-ns").setName("test-func").build(); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java index 6514ae7b14e..04cb7e08e6b 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java @@ -19,11 +19,11 @@ package org.apache.pulsar.functions.runtime.kubernetes; -import io.kubernetes.client.apis.AppsV1Api; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.V1ConfigMap; -import io.kubernetes.client.models.V1PodSpec; -import io.kubernetes.client.models.V1StatefulSet; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ConfigMap; +import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1StatefulSet; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.common.functions.Resources; diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index e9156cbff74..f2ae5d1f324 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -22,10 +22,10 @@ package org.apache.pulsar.functions.runtime.kubernetes; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.protobuf.util.JsonFormat; -import io.kubernetes.client.apis.AppsV1Api; -import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.custom.Quantity; -import io.kubernetes.client.models.*; +import io.kubernetes.client.openapi.models.*; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.instance.InstanceConfig; diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java index 83477c17ebe..638780c61f5 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java @@ -37,9 +37,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import io.kubernetes.client.apis.AppsV1Api; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.V1PodSpec; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1PodSpec; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; diff --git a/pulsar-functions/secrets/pom.xml b/pulsar-functions/secrets/pom.xml index c4500b5ab17..f15fbf03e89 100644 --- a/pulsar-functions/secrets/pom.xml +++ b/pulsar-functions/secrets/pom.xml @@ -34,7 +34,7 @@ io.kubernetes client-java - 2.0.0 + 9.0.2 compile diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java index 8d4dbffb72b..2a94ecebdb5 100644 --- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java +++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java @@ -19,7 +19,7 @@ package org.apache.pulsar.functions.secretsproviderconfigurator; import com.google.gson.reflect.TypeToken; -import io.kubernetes.client.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1PodSpec; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java index 8d4e34ea7a3..81e50fd3818 100644 --- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java +++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java @@ -20,9 +20,13 @@ package org.apache.pulsar.functions.secretsproviderconfigurator; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import io.kubernetes.client.apis.AppsV1Api; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.*; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Container; +import io.kubernetes.client.openapi.models.V1EnvVar; +import io.kubernetes.client.openapi.models.V1EnvVarSource; +import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1SecretKeySelector; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider; diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java index 349559e2075..e51242ec837 100644 --- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java +++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.functions.secretsproviderconfigurator; -import io.kubernetes.client.apis.AppsV1Api; -import io.kubernetes.client.apis.CoreV1Api; -import io.kubernetes.client.models.V1PodSpec; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1PodSpec; import org.apache.pulsar.functions.proto.Function; import java.lang.reflect.Type; diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java index d55510d1408..f44465e2ebe 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java @@ -41,10 +41,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; @@ -86,7 +83,7 @@ public class ElasticSearchSink implements Sink { indexRequest.source(keyValue.getValue(), XContentType.JSON); try { - IndexResponse indexResponse = getClient().index(indexRequest); + IndexResponse indexResponse = getClient().index(indexRequest, RequestOptions.DEFAULT); if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { record.ack(); } else { @@ -105,7 +102,7 @@ public class ElasticSearchSink implements Sink { private void createIndexIfNeeded() throws IOException { GetIndexRequest request = new GetIndexRequest(); request.indices(elasticSearchConfig.getIndexName()); - boolean exists = getClient().indices().exists(request); + boolean exists = getClient().indices().exists(request, RequestOptions.DEFAULT); if (!exists) { CreateIndexRequest cireq = new CreateIndexRequest(elasticSearchConfig.getIndexName()); @@ -114,7 +111,7 @@ public class ElasticSearchSink implements Sink { .put("index.number_of_shards", elasticSearchConfig.getIndexNumberOfShards()) .put("index.number_of_replicas", elasticSearchConfig.getIndexNumberOfReplicas())); - CreateIndexResponse ciresp = getClient().indices().create(cireq); + CreateIndexResponse ciresp = getClient().indices().create(cireq, RequestOptions.DEFAULT); if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) { throw new RuntimeException("Unable to create index."); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java index eee208e2e36..0f1cc8abfa3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java @@ -28,6 +28,7 @@ import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; @@ -65,8 +66,8 @@ public class ElasticSearchSinkTester extends SinkTester searchRequest.types("doc"); try { - SearchResponse searchResult = elasticClient.search(searchRequest); - assertTrue(searchResult.getHits().getTotalHits() > 0, searchResult.toString()); + SearchResponse searchResult = elasticClient.search(searchRequest, RequestOptions.DEFAULT); + assertTrue(searchResult.getHits().getTotalHits().value > 0, searchResult.toString()); } catch (Exception e) { fail("Encountered exception on validating elastic search results", e); } -- GitLab