From 460e1799d2b9069835ecae295c75f571c7d6b65a Mon Sep 17 00:00:00 2001 From: wangyang0918 Date: Wed, 6 Jan 2021 20:24:11 +0800 Subject: [PATCH] [FLINK-20798][k8s] Use namespaced kubernetes client when creating FlinkKubeClient After using namespaced kubernetes client, we will not need to always set the namespace when creating kubernetes resources(e.g. deployment, pods, configmap, watch, etc.). Address comments: Update the unit test to verify the configmap watch is created in appropriate namespace This closes #14570. --- .../kubeclient/DefaultKubeClientFactory.java | 8 ++- .../kubeclient/Fabric8FlinkKubeClient.java | 55 ++++--------------- .../resources/KubernetesLeaderElector.java | 3 +- .../flink/kubernetes/KubernetesTestBase.java | 37 ++++++++++++- .../kubernetes/MixedKubernetesServer.java | 17 ++++-- .../Fabric8FlinkKubeClientTest.java | 24 ++++++++ .../kubeclient/TestingFlinkKubeClient.java | 5 +- .../resources/NoOpWatchCallbackHandler.java | 55 +++++++++++++++++++ 8 files changed, 147 insertions(+), 57 deletions(-) create mode 100644 flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java index 59f157ae245..e7b57595d50 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java @@ -25,8 +25,8 @@ import org.apache.flink.util.FileUtils; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +83,11 @@ public class DefaultKubeClientFactory implements KubeClientFactory { config = Config.autoConfigure(kubeContext); } - final KubernetesClient client = new DefaultKubernetesClient(config); + final String namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE); + LOG.debug("Setting namespace of Kubernetes client to {}", namespace); + config.setNamespace(namespace); + + final NamespacedKubernetesClient client = new DefaultKubernetesClient(config); return new Fabric8FlinkKubeClient(flinkConfig, client, () -> ioExecutor); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index bd2943c9157..779b3698745 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -44,7 +44,6 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.ServicePort; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; import org.slf4j.Logger; @@ -69,7 +68,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class); - private final KubernetesClient internalClient; + private final NamespacedKubernetesClient internalClient; private final String clusterId; private final String namespace; private final int maxRetryAttempts; @@ -78,7 +77,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { public Fabric8FlinkKubeClient( Configuration flinkConfig, - KubernetesClient client, + NamespacedKubernetesClient client, Supplier asyncExecutorFactory) { this.internalClient = checkNotNull(client); this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)); @@ -100,19 +99,12 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { // create Deployment LOG.debug("Start to create deployment with spec {}", deployment.getSpec().toString()); final Deployment createdDeployment = - this.internalClient - .apps() - .deployments() - .inNamespace(this.namespace) - .create(deployment); + this.internalClient.apps().deployments().create(deployment); // Note that we should use the uid of the created Deployment for the OwnerReference. setOwnerReference(createdDeployment, accompanyingResources); - this.internalClient - .resourceList(accompanyingResources) - .inNamespace(this.namespace) - .createOrReplace(); + this.internalClient.resourceList(accompanyingResources).createOrReplace(); } @Override @@ -123,7 +115,6 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { this.internalClient .apps() .deployments() - .inNamespace(this.namespace) .withName(KubernetesUtils.getDeploymentName(clusterId)) .get(); @@ -146,10 +137,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { kubernetesPod.getInternalResource().getMetadata(), kubernetesPod.getInternalResource().getSpec()); - this.internalClient - .pods() - .inNamespace(this.namespace) - .create(kubernetesPod.getInternalResource()); + this.internalClient.pods().create(kubernetesPod.getInternalResource()); }, kubeClientExecutorService); } @@ -201,7 +189,6 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { this.internalClient .apps() .deployments() - .inNamespace(this.namespace) .withName(KubernetesUtils.getDeploymentName(clusterId)) .cascading(true) .delete(); @@ -217,12 +204,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { final String serviceName = ExternalServiceDecorator.getExternalServiceName(clusterId); final Service service = - this.internalClient - .services() - .inNamespace(namespace) - .withName(serviceName) - .fromServer() - .get(); + this.internalClient.services().withName(serviceName).fromServer().get(); if (service == null) { LOG.debug("Service {} does not exist", serviceName); @@ -247,10 +229,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { KubernetesLeaderElectionConfiguration leaderElectionConfiguration, KubernetesLeaderElector.LeaderCallbackHandler leaderCallbackHandler) { return new KubernetesLeaderElector( - (NamespacedKubernetesClient) this.internalClient, - namespace, - leaderElectionConfiguration, - leaderCallbackHandler); + this.internalClient, leaderElectionConfiguration, leaderCallbackHandler); } @Override @@ -260,7 +239,6 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { () -> this.internalClient .configMaps() - .inNamespace(namespace) .create(configMap.getInternalResource()), kubeClientExecutorService) .exceptionally( @@ -274,8 +252,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { @Override public Optional getConfigMap(String name) { - final ConfigMap configMap = - this.internalClient.configMaps().inNamespace(namespace).withName(name).get(); + final ConfigMap configMap = this.internalClient.configMaps().withName(name).get(); return configMap == null ? Optional.empty() : Optional.of(new KubernetesConfigMap(configMap)); @@ -299,8 +276,6 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { this .internalClient .configMaps() - .inNamespace( - namespace) .withName( configMapName) .lockResourceVersion( @@ -353,24 +328,14 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { @Override public CompletableFuture deleteConfigMapsByLabels(Map labels) { return CompletableFuture.runAsync( - () -> - this.internalClient - .configMaps() - .inNamespace(namespace) - .withLabels(labels) - .delete(), + () -> this.internalClient.configMaps().withLabels(labels).delete(), kubeClientExecutorService); } @Override public CompletableFuture deleteConfigMap(String configMapName) { return CompletableFuture.runAsync( - () -> - this.internalClient - .configMaps() - .inNamespace(namespace) - .withName(configMapName) - .delete(), + () -> this.internalClient.configMaps().withName(configMapName).delete(), kubeClientExecutorService); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java index abdf0e21929..82a0bf9cfb6 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java @@ -61,7 +61,6 @@ public class KubernetesLeaderElector { public KubernetesLeaderElector( NamespacedKubernetesClient kubernetesClient, - String namespace, KubernetesLeaderElectionConfiguration leaderConfig, LeaderCallbackHandler leaderCallbackHandler) { final LeaderElectionConfig leaderElectionConfig = @@ -70,7 +69,7 @@ public class KubernetesLeaderElector { .withLeaseDuration(leaderConfig.getLeaseDuration()) .withLock( new ConfigMapLock( - namespace, + kubernetesClient.getNamespace(), leaderConfig.getConfigMapName(), leaderConfig.getLockIdentity())) .withRenewDeadline(leaderConfig.getRenewDeadline()) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java index 7f452ee48f8..4b9b26ce07b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java @@ -31,7 +31,12 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.TestLogger; -import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.api.model.Config; +import io.fabric8.kubernetes.api.model.ConfigBuilder; +import io.fabric8.kubernetes.api.model.NamedClusterBuilder; +import io.fabric8.kubernetes.api.model.NamedContextBuilder; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.utils.Serialization; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -66,7 +71,7 @@ public class KubernetesTestBase extends TestLogger { protected final Configuration flinkConfig = new Configuration(); - protected KubernetesClient kubeClient; + protected NamespacedKubernetesClient kubeClient; protected FlinkKubeClient flinkKubeClient; @@ -132,4 +137,32 @@ public class KubernetesTestBase extends TestLogger { KubernetesTestUtils.createTemporyFile("some keytab", kerberosDir, KEYTAB_FILE); KubernetesTestUtils.createTemporyFile("some conf", kerberosDir, KRB5_CONF_FILE); } + + protected String writeKubeConfigForMockKubernetesServer() throws Exception { + final Config kubeConfig = + new ConfigBuilder() + .withApiVersion(server.getClient().getApiVersion()) + .withClusters( + new NamedClusterBuilder() + .withName(CLUSTER_ID) + .withNewCluster() + .withNewServer(server.getClient().getMasterUrl().toString()) + .withInsecureSkipTlsVerify(true) + .endCluster() + .build()) + .withContexts( + new NamedContextBuilder() + .withName(CLUSTER_ID) + .withNewContext() + .withCluster(CLUSTER_ID) + .withUser( + server.getClient().getConfiguration().getUsername()) + .endContext() + .build()) + .withNewCurrentContext(CLUSTER_ID) + .build(); + final File kubeConfigFile = new File(temporaryFolder.newFolder(".kube"), "config"); + Serialization.yamlMapper().writeValue(kubeConfigFile, kubeConfig); + return kubeConfigFile.getAbsolutePath(); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java index 57b9a9d694b..acdf697c04e 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java @@ -25,23 +25,28 @@ import io.fabric8.mockwebserver.ServerRequest; import io.fabric8.mockwebserver.ServerResponse; import io.fabric8.mockwebserver.dsl.MockServerExpectation; import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; import org.junit.rules.ExternalResource; import java.util.HashMap; import java.util.Queue; +import java.util.concurrent.TimeUnit; /** The mock server that host MixedDispatcher. */ public class MixedKubernetesServer extends ExternalResource { private KubernetesMockServer mock; private NamespacedKubernetesClient client; - private boolean https; + private final boolean https; - private boolean crudMode; + private final boolean crudMode; + + private final MockWebServer mockWebServer; public MixedKubernetesServer(boolean https, boolean crudMode) { this.https = https; this.crudMode = crudMode; + mockWebServer = new MockWebServer(); } public void before() { @@ -50,11 +55,11 @@ public class MixedKubernetesServer extends ExternalResource { crudMode ? new KubernetesMockServer( new Context(), - new MockWebServer(), + mockWebServer, response, new MixedDispatcher(response), true) - : new KubernetesMockServer(https); + : new KubernetesMockServer(mockWebServer, response, https); mock.init(); client = mock.createClient(); } @@ -68,6 +73,10 @@ public class MixedKubernetesServer extends ExternalResource { return client; } + public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception { + return mockWebServer.takeRequest(timeout, unit); + } + public MockServerExpectation expect() { return mock.expect(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java index 3c49e037004..b0195efad20 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java @@ -33,6 +33,8 @@ import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactor import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.NoOpWatchCallbackHandler; +import org.apache.flink.runtime.rest.HttpMethodWrapper; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; @@ -42,6 +44,7 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.apps.Deployment; +import okhttp3.mockwebserver.RecordedRequest; import org.junit.Test; import java.util.HashMap; @@ -50,6 +53,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; @@ -64,6 +68,7 @@ import static org.junit.Assert.fail; /** Tests for Fabric implementation of {@link FlinkKubeClient}. */ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase { + private static final long TIMEOUT = 10 * 1000; private static final int RPC_PORT = 7123; private static final int BLOB_SERVER_PORT = 8346; @@ -456,6 +461,25 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase { } } + @Test + public void testWatchConfigMaps() throws Exception { + final String kubeConfigFile = writeKubeConfigForMockKubernetesServer(); + flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE, kubeConfigFile); + + final FlinkKubeClient realFlinkKubeClient = + DefaultKubeClientFactory.getInstance().fromConfiguration(flinkConfig); + realFlinkKubeClient.watchConfigMaps(CLUSTER_ID, new NoOpWatchCallbackHandler<>()); + final String path = + "/api/v1/namespaces/" + + NAMESPACE + + "/configmaps?fieldSelector=metadata.name%3D" + + CLUSTER_ID + + "&watch=true"; + final RecordedRequest watchRequest = server.takeRequest(TIMEOUT, TimeUnit.MILLISECONDS); + assertThat(watchRequest.getPath(), is(path)); + assertThat(watchRequest.getMethod(), is(HttpMethodWrapper.GET.toString())); + } + private KubernetesConfigMap buildTestingConfigMap() { final Map data = new HashMap<>(); data.put(TESTING_CONFIG_MAP_KEY, TESTING_CONFIG_MAP_VALUE); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java index 5879dcd666b..c046613496c 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java @@ -27,6 +27,8 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.util.Preconditions; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; + import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -413,12 +415,11 @@ public class TestingFlinkKubeClient implements FlinkKubeClient { /** Testing implementation of {@link KubernetesLeaderElector}. */ public static class TestingKubernetesLeaderElector extends KubernetesLeaderElector { - private static final String NAMESPACE = "test"; public TestingKubernetesLeaderElector( KubernetesLeaderElectionConfiguration leaderConfig, LeaderCallbackHandler leaderCallbackHandler) { - super(null, NAMESPACE, leaderConfig, leaderCallbackHandler); + super(new KubernetesMockServer().createClient(), leaderConfig, leaderCallbackHandler); } @Override diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java new file mode 100644 index 00000000000..6e64a4d09e2 --- /dev/null +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.resources; + +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; + +import java.util.List; + +/** + * Empty implementation of {@link FlinkKubeClient.WatchCallbackHandler}. + * + * @param Type of resource to be watched + */ +public class NoOpWatchCallbackHandler implements FlinkKubeClient.WatchCallbackHandler { + @Override + public void onAdded(List resources) { + // noop + } + + @Override + public void onModified(List resources) { + // noop + } + + @Override + public void onDeleted(List resources) { + // noop + } + + @Override + public void onError(List resources) { + // noop + } + + @Override + public void handleFatalError(Throwable throwable) { + // noop + } +} -- GitLab