未验证 提交 460e1799 编写于 作者: W wangyang0918 提交者: Till Rohrmann

[FLINK-20798][k8s] Use namespaced kubernetes client when creating FlinkKubeClient

After using namespaced kubernetes client, we will not need to always set the namespace when creating kubernetes resources(e.g. deployment, pods, configmap, watch, etc.).

Address comments: Update the unit test to verify the configmap watch is created in appropriate namespace

This closes #14570.
上级 e98d251a
......@@ -25,8 +25,8 @@ import org.apache.flink.util.FileUtils;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -83,7 +83,11 @@ public class DefaultKubeClientFactory implements KubeClientFactory {
config = Config.autoConfigure(kubeContext);
}
final KubernetesClient client = new DefaultKubernetesClient(config);
final String namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
config.setNamespace(namespace);
final NamespacedKubernetesClient client = new DefaultKubernetesClient(config);
return new Fabric8FlinkKubeClient(flinkConfig, client, () -> ioExecutor);
}
......
......@@ -44,7 +44,6 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.slf4j.Logger;
......@@ -69,7 +68,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
private final KubernetesClient internalClient;
private final NamespacedKubernetesClient internalClient;
private final String clusterId;
private final String namespace;
private final int maxRetryAttempts;
......@@ -78,7 +77,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
public Fabric8FlinkKubeClient(
Configuration flinkConfig,
KubernetesClient client,
NamespacedKubernetesClient client,
Supplier<Executor> asyncExecutorFactory) {
this.internalClient = checkNotNull(client);
this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
......@@ -100,19 +99,12 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
// create Deployment
LOG.debug("Start to create deployment with spec {}", deployment.getSpec().toString());
final Deployment createdDeployment =
this.internalClient
.apps()
.deployments()
.inNamespace(this.namespace)
.create(deployment);
this.internalClient.apps().deployments().create(deployment);
// Note that we should use the uid of the created Deployment for the OwnerReference.
setOwnerReference(createdDeployment, accompanyingResources);
this.internalClient
.resourceList(accompanyingResources)
.inNamespace(this.namespace)
.createOrReplace();
this.internalClient.resourceList(accompanyingResources).createOrReplace();
}
@Override
......@@ -123,7 +115,6 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
this.internalClient
.apps()
.deployments()
.inNamespace(this.namespace)
.withName(KubernetesUtils.getDeploymentName(clusterId))
.get();
......@@ -146,10 +137,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
kubernetesPod.getInternalResource().getMetadata(),
kubernetesPod.getInternalResource().getSpec());
this.internalClient
.pods()
.inNamespace(this.namespace)
.create(kubernetesPod.getInternalResource());
this.internalClient.pods().create(kubernetesPod.getInternalResource());
},
kubeClientExecutorService);
}
......@@ -201,7 +189,6 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
this.internalClient
.apps()
.deployments()
.inNamespace(this.namespace)
.withName(KubernetesUtils.getDeploymentName(clusterId))
.cascading(true)
.delete();
......@@ -217,12 +204,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
final String serviceName = ExternalServiceDecorator.getExternalServiceName(clusterId);
final Service service =
this.internalClient
.services()
.inNamespace(namespace)
.withName(serviceName)
.fromServer()
.get();
this.internalClient.services().withName(serviceName).fromServer().get();
if (service == null) {
LOG.debug("Service {} does not exist", serviceName);
......@@ -247,10 +229,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
KubernetesLeaderElectionConfiguration leaderElectionConfiguration,
KubernetesLeaderElector.LeaderCallbackHandler leaderCallbackHandler) {
return new KubernetesLeaderElector(
(NamespacedKubernetesClient) this.internalClient,
namespace,
leaderElectionConfiguration,
leaderCallbackHandler);
this.internalClient, leaderElectionConfiguration, leaderCallbackHandler);
}
@Override
......@@ -260,7 +239,6 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
() ->
this.internalClient
.configMaps()
.inNamespace(namespace)
.create(configMap.getInternalResource()),
kubeClientExecutorService)
.exceptionally(
......@@ -274,8 +252,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
@Override
public Optional<KubernetesConfigMap> getConfigMap(String name) {
final ConfigMap configMap =
this.internalClient.configMaps().inNamespace(namespace).withName(name).get();
final ConfigMap configMap = this.internalClient.configMaps().withName(name).get();
return configMap == null
? Optional.empty()
: Optional.of(new KubernetesConfigMap(configMap));
......@@ -299,8 +276,6 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
this
.internalClient
.configMaps()
.inNamespace(
namespace)
.withName(
configMapName)
.lockResourceVersion(
......@@ -353,24 +328,14 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
@Override
public CompletableFuture<Void> deleteConfigMapsByLabels(Map<String, String> labels) {
return CompletableFuture.runAsync(
() ->
this.internalClient
.configMaps()
.inNamespace(namespace)
.withLabels(labels)
.delete(),
() -> this.internalClient.configMaps().withLabels(labels).delete(),
kubeClientExecutorService);
}
@Override
public CompletableFuture<Void> deleteConfigMap(String configMapName) {
return CompletableFuture.runAsync(
() ->
this.internalClient
.configMaps()
.inNamespace(namespace)
.withName(configMapName)
.delete(),
() -> this.internalClient.configMaps().withName(configMapName).delete(),
kubeClientExecutorService);
}
......
......@@ -61,7 +61,6 @@ public class KubernetesLeaderElector {
public KubernetesLeaderElector(
NamespacedKubernetesClient kubernetesClient,
String namespace,
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler) {
final LeaderElectionConfig leaderElectionConfig =
......@@ -70,7 +69,7 @@ public class KubernetesLeaderElector {
.withLeaseDuration(leaderConfig.getLeaseDuration())
.withLock(
new ConfigMapLock(
namespace,
kubernetesClient.getNamespace(),
leaderConfig.getConfigMapName(),
leaderConfig.getLockIdentity()))
.withRenewDeadline(leaderConfig.getRenewDeadline())
......
......@@ -31,7 +31,12 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.util.TestLogger;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.api.model.Config;
import io.fabric8.kubernetes.api.model.ConfigBuilder;
import io.fabric8.kubernetes.api.model.NamedClusterBuilder;
import io.fabric8.kubernetes.api.model.NamedContextBuilder;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
......@@ -66,7 +71,7 @@ public class KubernetesTestBase extends TestLogger {
protected final Configuration flinkConfig = new Configuration();
protected KubernetesClient kubeClient;
protected NamespacedKubernetesClient kubeClient;
protected FlinkKubeClient flinkKubeClient;
......@@ -132,4 +137,32 @@ public class KubernetesTestBase extends TestLogger {
KubernetesTestUtils.createTemporyFile("some keytab", kerberosDir, KEYTAB_FILE);
KubernetesTestUtils.createTemporyFile("some conf", kerberosDir, KRB5_CONF_FILE);
}
protected String writeKubeConfigForMockKubernetesServer() throws Exception {
final Config kubeConfig =
new ConfigBuilder()
.withApiVersion(server.getClient().getApiVersion())
.withClusters(
new NamedClusterBuilder()
.withName(CLUSTER_ID)
.withNewCluster()
.withNewServer(server.getClient().getMasterUrl().toString())
.withInsecureSkipTlsVerify(true)
.endCluster()
.build())
.withContexts(
new NamedContextBuilder()
.withName(CLUSTER_ID)
.withNewContext()
.withCluster(CLUSTER_ID)
.withUser(
server.getClient().getConfiguration().getUsername())
.endContext()
.build())
.withNewCurrentContext(CLUSTER_ID)
.build();
final File kubeConfigFile = new File(temporaryFolder.newFolder(".kube"), "config");
Serialization.yamlMapper().writeValue(kubeConfigFile, kubeConfig);
return kubeConfigFile.getAbsolutePath();
}
}
......@@ -25,23 +25,28 @@ import io.fabric8.mockwebserver.ServerRequest;
import io.fabric8.mockwebserver.ServerResponse;
import io.fabric8.mockwebserver.dsl.MockServerExpectation;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.rules.ExternalResource;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
/** The mock server that host MixedDispatcher. */
public class MixedKubernetesServer extends ExternalResource {
private KubernetesMockServer mock;
private NamespacedKubernetesClient client;
private boolean https;
private final boolean https;
private boolean crudMode;
private final boolean crudMode;
private final MockWebServer mockWebServer;
public MixedKubernetesServer(boolean https, boolean crudMode) {
this.https = https;
this.crudMode = crudMode;
mockWebServer = new MockWebServer();
}
public void before() {
......@@ -50,11 +55,11 @@ public class MixedKubernetesServer extends ExternalResource {
crudMode
? new KubernetesMockServer(
new Context(),
new MockWebServer(),
mockWebServer,
response,
new MixedDispatcher(response),
true)
: new KubernetesMockServer(https);
: new KubernetesMockServer(mockWebServer, response, https);
mock.init();
client = mock.createClient();
}
......@@ -68,6 +73,10 @@ public class MixedKubernetesServer extends ExternalResource {
return client;
}
public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws Exception {
return mockWebServer.takeRequest(timeout, unit);
}
public MockServerExpectation expect() {
return mock.expect();
}
......
......@@ -33,6 +33,8 @@ import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactor
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.NoOpWatchCallbackHandler;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
......@@ -42,6 +44,7 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.Test;
import java.util.HashMap;
......@@ -50,6 +53,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
......@@ -64,6 +68,7 @@ import static org.junit.Assert.fail;
/** Tests for Fabric implementation of {@link FlinkKubeClient}. */
public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
private static final long TIMEOUT = 10 * 1000;
private static final int RPC_PORT = 7123;
private static final int BLOB_SERVER_PORT = 8346;
......@@ -456,6 +461,25 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
}
}
@Test
public void testWatchConfigMaps() throws Exception {
final String kubeConfigFile = writeKubeConfigForMockKubernetesServer();
flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE, kubeConfigFile);
final FlinkKubeClient realFlinkKubeClient =
DefaultKubeClientFactory.getInstance().fromConfiguration(flinkConfig);
realFlinkKubeClient.watchConfigMaps(CLUSTER_ID, new NoOpWatchCallbackHandler<>());
final String path =
"/api/v1/namespaces/"
+ NAMESPACE
+ "/configmaps?fieldSelector=metadata.name%3D"
+ CLUSTER_ID
+ "&watch=true";
final RecordedRequest watchRequest = server.takeRequest(TIMEOUT, TimeUnit.MILLISECONDS);
assertThat(watchRequest.getPath(), is(path));
assertThat(watchRequest.getMethod(), is(HttpMethodWrapper.GET.toString()));
}
private KubernetesConfigMap buildTestingConfigMap() {
final Map<String, String> data = new HashMap<>();
data.put(TESTING_CONFIG_MAP_KEY, TESTING_CONFIG_MAP_VALUE);
......
......@@ -27,6 +27,8 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.Preconditions;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
......@@ -413,12 +415,11 @@ public class TestingFlinkKubeClient implements FlinkKubeClient {
/** Testing implementation of {@link KubernetesLeaderElector}. */
public static class TestingKubernetesLeaderElector extends KubernetesLeaderElector {
private static final String NAMESPACE = "test";
public TestingKubernetesLeaderElector(
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler) {
super(null, NAMESPACE, leaderConfig, leaderCallbackHandler);
super(new KubernetesMockServer().createClient(), leaderConfig, leaderCallbackHandler);
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.kubeclient.resources;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import java.util.List;
/**
* Empty implementation of {@link FlinkKubeClient.WatchCallbackHandler}.
*
* @param <T> Type of resource to be watched
*/
public class NoOpWatchCallbackHandler<T> implements FlinkKubeClient.WatchCallbackHandler<T> {
@Override
public void onAdded(List<T> resources) {
// noop
}
@Override
public void onModified(List<T> resources) {
// noop
}
@Override
public void onDeleted(List<T> resources) {
// noop
}
@Override
public void onError(List<T> resources) {
// noop
}
@Override
public void handleFatalError(Throwable throwable) {
// noop
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册