提交 577abe7f 编写于 作者: W wangyang0918 提交者: Andrey Zagrebin

[FLINK-15790][k8s] Make some interfaces in FlinkKubeClient asynchronous which...

[FLINK-15790][k8s] Make some interfaces in FlinkKubeClient asynchronous which potentially blocks the execution of RpcEndpoint's main thread

The interfaces in FlinkKubeClient will be called both in Client and ResourceManager. To avoid potentially blocking the execution of RpcEndpoint's main thread, these interfaces #createTaskManagerPod, #stopPod should be implemented asynchronously.

This closes #11427.
上级 f4a062aa
......@@ -51,6 +51,8 @@ import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
......@@ -85,11 +87,11 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
return () -> {
final Configuration configuration = new Configuration(flinkConfig);
final Endpoint restEndpoint = client.getRestEndpoint(clusterId);
final Optional<Endpoint> restEndpoint = client.getRestEndpoint(clusterId);
if (restEndpoint != null) {
configuration.setString(RestOptions.ADDRESS, restEndpoint.getAddress());
configuration.setInteger(RestOptions.PORT, restEndpoint.getPort());
if (restEndpoint.isPresent()) {
configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress());
configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort());
} else {
throw new RuntimeException(
new ClusterRetrieveException(
......
......@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
......@@ -86,6 +86,8 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
private final ContaineredTaskManagerParameters taskManagerParameters;
private final KubernetesResourceManagerConfiguration configuration;
/** The number of pods requested, but not yet granted. */
private int numPendingPodRequests = 0;
......@@ -101,7 +103,9 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup) {
ResourceManagerMetricGroup resourceManagerMetricGroup,
FlinkKubeClient kubeClient,
KubernetesResourceManagerConfiguration configuration) {
super(
flinkConfig,
System.getenv(),
......@@ -116,13 +120,14 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup);
this.clusterId = flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
this.clusterId = configuration.getClusterId();
this.defaultCpus = taskExecutorProcessSpec.getCpuCores().getValue().doubleValue();
this.kubeClient = createFlinkKubeClient();
this.kubeClient = kubeClient;
this.taskManagerParameters =
ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec, numSlotsPerTaskManager);
this.configuration = configuration;
}
@Override
......@@ -179,12 +184,7 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
public boolean stopWorker(final KubernetesWorkerNode worker) {
LOG.info("Stopping Worker {}.", worker.getResourceID());
workerNodes.remove(worker.getResourceID());
try {
kubeClient.stopPod(worker.getResourceID().toString());
} catch (Exception e) {
kubeClient.handleException(e);
return false;
}
internalStopPod(worker.getResourceID().toString());
return true;
}
......@@ -268,7 +268,23 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec);
kubeClient.createTaskManagerPod(taskManagerPod);
kubeClient.createTaskManagerPod(taskManagerPod)
.whenComplete(
(ignore, throwable) -> {
if (throwable != null) {
log.error("Could not start TaskManager in pod {}.", podName, throwable);
scheduleRunAsync(
this::decreasePendingAndRequestKubernetesPodIfRequired,
configuration.getPodCreationRetryInterval());
}
}
);
}
private void decreasePendingAndRequestKubernetesPodIfRequired() {
validateRunsInMainThread();
numPendingPodRequests--;
requestKubernetesPodIfRequired();
}
/**
......@@ -285,7 +301,7 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
internalStopPod(pod.getName());
final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(new ResourceID(pod.getName()));
if (kubernetesWorkerNode != null) {
requestKubernetesPodIfRequired();
......@@ -293,12 +309,19 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
}
}
protected FlinkKubeClient createFlinkKubeClient() {
return KubeClientFactory.fromConfiguration(flinkConfig);
}
@Override
protected double getCpuCores(Configuration configuration) {
return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, KubernetesConfigOptions.TASK_MANAGER_CPU);
}
private void internalStopPod(String podName) {
kubeClient.stopPod(podName)
.whenComplete(
(ignore, throwable) -> {
if (throwable != null) {
log.error("Could not stop TaskManager in pod {}.", podName, throwable);
}
}
);
}
}
......@@ -101,7 +101,7 @@ public class KubernetesSessionCli {
final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration);
// Retrieve or create a session cluster.
if (clusterId != null && kubeClient.getRestService(clusterId) != null) {
if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
} else {
clusterClient = kubernetesClusterDescriptor
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.configuration;
import org.apache.flink.api.common.time.Time;
/**
* Configuration specific to {@link org.apache.flink.kubernetes.KubernetesResourceManager}.
*/
public class KubernetesResourceManagerConfiguration {
private final String clusterId;
private final Time podCreationRetryInterval;
public KubernetesResourceManagerConfiguration(String clusterId, Time podCreationRetryInterval) {
this.clusterId = clusterId;
this.podCreationRetryInterval = podCreationRetryInterval;
}
public String getClusterId() {
return clusterId;
}
public Time getPodCreationRetryInterval() {
return podCreationRetryInterval;
}
}
......@@ -18,9 +18,13 @@
package org.apache.flink.kubernetes.entrypoint;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.KubernetesResourceManager;
import org.apache.flink.kubernetes.KubernetesWorkerNode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
......@@ -44,6 +48,8 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto
private static final KubernetesResourceManagerFactory INSTANCE = new KubernetesResourceManagerFactory();
private static final Time POD_CREATION_RETRY_INTERVAL = Time.seconds(3L);
private KubernetesResourceManagerFactory() {}
public static KubernetesResourceManagerFactory getInstance() {
......@@ -67,6 +73,10 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto
rmServicesConfiguration,
highAvailabilityServices,
rpcService.getScheduledExecutor());
final KubernetesResourceManagerConfiguration kubernetesResourceManagerConfiguration =
new KubernetesResourceManagerConfiguration(
configuration.getString(KubernetesConfigOptions.CLUSTER_ID),
POD_CREATION_RETRY_INTERVAL);
return new KubernetesResourceManager(
rpcService,
......@@ -80,6 +90,8 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto
rmRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup);
resourceManagerMetricGroup,
KubeClientFactory.fromConfiguration(configuration),
kubernetesResourceManagerConfiguration);
}
}
......@@ -24,8 +24,10 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.ExecutorUtils;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
......@@ -38,12 +40,15 @@ import io.fabric8.kubernetes.client.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -55,17 +60,22 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
private final Configuration flinkConfig;
private final KubernetesClient internalClient;
private final String clusterId;
private final String nameSpace;
public Fabric8FlinkKubeClient(Configuration flinkConfig, KubernetesClient client) {
this.flinkConfig = checkNotNull(flinkConfig);
private final ExecutorService kubeClientExecutorService;
public Fabric8FlinkKubeClient(
Configuration flinkConfig,
KubernetesClient client,
Supplier<ExecutorService> asyncExecutorFactory) {
this.internalClient = checkNotNull(client);
this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
this.nameSpace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
this.kubeClientExecutorService = asyncExecutorFactory.get();
}
@Override
......@@ -91,45 +101,50 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
}
@Override
public void createTaskManagerPod(KubernetesPod kubernetesPod) {
final Deployment masterDeployment = this.internalClient
.apps()
.deployments()
.inNamespace(this.nameSpace)
.withName(KubernetesUtils.getDeploymentName(clusterId))
.get();
if (masterDeployment == null) {
throw new RuntimeException(
"Failed to find Deployment named " + clusterId + " in namespace " + this.nameSpace);
}
public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {
return CompletableFuture.runAsync(
() -> {
final Deployment masterDeployment = this.internalClient
.apps()
.deployments()
.inNamespace(this.nameSpace)
.withName(KubernetesUtils.getDeploymentName(clusterId))
.get();
if (masterDeployment == null) {
throw new RuntimeException(
"Failed to find Deployment named " + clusterId + " in namespace " + this.nameSpace);
}
// Note that we should use the uid of the master Deployment for the OwnerReference.
setOwnerReference(masterDeployment, Collections.singletonList(kubernetesPod.getInternalResource()));
// Note that we should use the uid of the master Deployment for the OwnerReference.
setOwnerReference(masterDeployment, Collections.singletonList(kubernetesPod.getInternalResource()));
LOG.debug("Start to create pod with metadata {}, spec {}",
kubernetesPod.getInternalResource().getMetadata(),
kubernetesPod.getInternalResource().getSpec());
LOG.debug("Start to create pod with metadata {}, spec {}",
kubernetesPod.getInternalResource().getMetadata(),
kubernetesPod.getInternalResource().getSpec());
this.internalClient
.pods()
.inNamespace(this.nameSpace)
.create(kubernetesPod.getInternalResource());
this.internalClient
.pods()
.inNamespace(this.nameSpace)
.create(kubernetesPod.getInternalResource());
},
kubeClientExecutorService);
}
@Override
public void stopPod(String podName) {
this.internalClient.pods().withName(podName).delete();
public CompletableFuture<Void> stopPod(String podName) {
return CompletableFuture.runAsync(
() -> this.internalClient.pods().withName(podName).delete(),
kubeClientExecutorService);
}
@Override
@Nullable
public Endpoint getRestEndpoint(String clusterId) {
final KubernetesService restService = getRestService(clusterId);
if (restService == null) {
return null;
public Optional<Endpoint> getRestEndpoint(String clusterId) {
Optional<KubernetesService> restService = getRestService(clusterId);
if (!restService.isPresent()) {
return Optional.empty();
}
final Service service = restService.getInternalResource();
final Service service = restService.get().getInternalResource();
final int restPort = getRestPortFromExternalService(service);
final KubernetesConfigOptions.ServiceExposedType serviceExposedType =
......@@ -137,35 +152,18 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
// Return the service.namespace directly when use ClusterIP.
if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
return new Endpoint(KubernetesUtils.getInternalServiceName(clusterId) + "." + nameSpace, restPort);
return Optional.of(
new Endpoint(KubernetesUtils.getInternalServiceName(clusterId) + "." + nameSpace, restPort));
}
String address = null;
if (service.getStatus() != null && (service.getStatus().getLoadBalancer() != null ||
service.getStatus().getLoadBalancer().getIngress() != null)) {
if (service.getStatus().getLoadBalancer().getIngress().size() > 0) {
address = service.getStatus().getLoadBalancer().getIngress().get(0).getIp();
if (address == null || address.isEmpty()) {
address = service.getStatus().getLoadBalancer().getIngress().get(0).getHostname();
}
} else {
address = this.internalClient.getMasterUrl().getHost();
}
} else if (service.getSpec().getExternalIPs() != null && service.getSpec().getExternalIPs().size() > 0) {
address = service.getSpec().getExternalIPs().get(0);
}
if (address == null || address.isEmpty()) {
return null;
}
return new Endpoint(address, restPort);
return getRestEndPointFromService(service, restPort);
}
@Override
public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems();
if (podList == null || podList.size() < 1) {
if (podList == null || podList.isEmpty()) {
return new ArrayList<>();
}
......@@ -192,8 +190,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
}
@Override
@Nullable
public KubernetesService getRestService(String clusterId) {
public Optional<KubernetesService> getRestService(String clusterId) {
final String serviceName = KubernetesUtils.getRestServiceName(clusterId);
final Service service = this.internalClient
......@@ -205,10 +202,10 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
if (service == null) {
LOG.debug("Service {} does not exist", serviceName);
return null;
return Optional.empty();
}
return new KubernetesService(service);
return Optional.of(new KubernetesService(service));
}
@Override
......@@ -247,6 +244,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
@Override
public void close() {
this.internalClient.close();
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.kubeClientExecutorService);
}
private void setOwnerReference(Deployment deployment, List<HasMetadata> resources) {
......@@ -291,4 +289,41 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
throw new RuntimeException("Unrecognized Service type: " + externalServiceType);
}
}
private Optional<Endpoint> getRestEndPointFromService(Service service, int restPort) {
if (service.getStatus() == null) {
return Optional.empty();
}
LoadBalancerStatus loadBalancer = service.getStatus().getLoadBalancer();
boolean hasExternalIP = service.getSpec() != null &&
service.getSpec().getExternalIPs() != null && !service.getSpec().getExternalIPs().isEmpty();
if (loadBalancer != null) {
return getLoadBalancerRestEndpoint(loadBalancer, restPort);
} else if (hasExternalIP) {
final String address = service.getSpec().getExternalIPs().get(0);
if (address != null && !address.isEmpty()) {
return Optional.of(new Endpoint(address, restPort));
}
}
return Optional.empty();
}
private Optional<Endpoint> getLoadBalancerRestEndpoint(LoadBalancerStatus loadBalancer, int restPort) {
boolean hasIngress = loadBalancer.getIngress() != null && !loadBalancer.getIngress().isEmpty();
String address;
if (hasIngress) {
address = loadBalancer.getIngress().get(0).getIp();
// Use hostname when the ip address is null
if (address == null || address.isEmpty()) {
address = loadBalancer.getIngress().get(0).getHostname();
}
} else {
// Use node port
address = this.internalClient.getMasterUrl().getHost();
}
boolean noAddress = address == null || address.isEmpty();
return noAddress ? Optional.empty() : Optional.of(new Endpoint(address, restPort));
}
}
......@@ -21,33 +21,40 @@ package org.apache.flink.kubernetes.kubeclient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
* The client to talk with kubernetes.
* The client to talk with kubernetes. The interfaces will be called both in Client and ResourceManager. To avoid
* potentially blocking the execution of RpcEndpoint's main thread, these interfaces
* {@link #createTaskManagerPod(KubernetesPod)}, {@link #stopPod(String)} should be implemented asynchronously.
*/
public interface FlinkKubeClient extends AutoCloseable {
/**
* Create the Master components, this can include the Deployment, the ConfigMap(s), and the Service(s).
*
* @param kubernetesJMSpec jobmanager specification
*/
void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec);
/**
* Create task manager pod.
*
* @param kubernetesPod taskmanager pod
* @return Return the taskmanager pod creation future
*/
void createTaskManagerPod(KubernetesPod kubernetesPod);
CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod);
/**
* Stop a specified pod by name.
*
* @param podName pod name
* @return Return the pod stop future
*/
void stopPod(String podName);
CompletableFuture<Void> stopPod(String podName);
/**
* Stop cluster and clean up all resources, include services, auxiliary services and all running pods.
......@@ -60,19 +67,17 @@ public interface FlinkKubeClient extends AutoCloseable {
* Get the kubernetes rest service of the given flink clusterId.
*
* @param clusterId cluster id
* @return Return the rest service of the specified cluster id. Return null if the service does not exist.
* @return Return the optional rest service of the specified cluster id.
*/
@Nullable
KubernetesService getRestService(String clusterId);
Optional<KubernetesService> getRestService(String clusterId);
/**
* Get the rest endpoint for access outside cluster.
*
* @param clusterId cluster id
* @return Return null if the service does not exist or could not extract the Endpoint from the service.
* @return Return empty if the service does not exist or could not extract the Endpoint from the service.
*/
@Nullable
Endpoint getRestEndpoint(String clusterId);
Optional<Endpoint> getRestEndpoint(String clusterId);
/**
* List the pods with specified labels.
......
......@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FileUtils;
import io.fabric8.kubernetes.client.Config;
......@@ -31,6 +32,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Factory class to create {@link FlinkKubeClient}.
......@@ -68,6 +71,10 @@ public class KubeClientFactory {
final KubernetesClient client = new DefaultKubernetesClient(config);
return new Fabric8FlinkKubeClient(flinkConfig, client);
return new Fabric8FlinkKubeClient(flinkConfig, client, KubeClientFactory::createThreadPoolForAsyncIO);
}
private static ExecutorService createThreadPoolForAsyncIO() {
return Executors.newFixedThreadPool(2, new ExecutorThreadFactory("FlinkKubeClient-IO"));
}
}
......@@ -23,7 +23,10 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.utils.Constants;
......@@ -32,6 +35,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
......@@ -77,6 +82,8 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static junit.framework.TestCase.assertEquals;
......@@ -92,6 +99,7 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
private static final Time TIMEOUT = Time.seconds(10L);
private static final String JOB_MANAGER_HOST = "jm-host1";
private static final Time TESTING_POD_CREATION_RETRY_INTERVAL = Time.milliseconds(50L);
private TestingFatalErrorHandler testingFatalErrorHandler;
......@@ -106,7 +114,7 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
testingFatalErrorHandler = new TestingFatalErrorHandler();
resourceManager = createAndStartResourceManager(flinkConfig);
resourceManager = createAndStartResourceManager(flinkConfig, flinkKubeClient);
final Deployment mockDeployment = new DeploymentBuilder()
.editOrNewMetadata()
......@@ -139,7 +147,9 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup) {
ResourceManagerMetricGroup resourceManagerMetricGroup,
FlinkKubeClient flinkKubeClient,
KubernetesResourceManagerConfiguration configuration) {
super(
rpcService,
resourceManagerEndpointId,
......@@ -152,7 +162,9 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
jobLeaderIdService,
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup
resourceManagerMetricGroup,
flinkKubeClient,
configuration
);
this.slotManager = slotManager;
}
......@@ -166,11 +178,6 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
runnable.run();
}
@Override
protected FlinkKubeClient createFlinkKubeClient() {
return flinkKubeClient;
}
MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}
......@@ -275,7 +282,7 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
.endSpec()
.build();
flinkKubeClient.createTaskManagerPod(new KubernetesPod(mockTaskManagerPod));
flinkKubeClient.createTaskManagerPod(new KubernetesPod(mockTaskManagerPod)).get();
assertEquals(1, kubeClient.pods().list().getItems().size());
// Call initialize method to recover worker nodes from previous attempt.
......@@ -323,7 +330,30 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
assertThat(resourceManager.getCpuCores(configuration), is(3.0));
}
private TestingKubernetesResourceManager createAndStartResourceManager(Configuration configuration) throws Exception {
@Test
public void testCreateTaskManagerPodFailedAndRetry() throws Exception {
final AtomicInteger retries = new AtomicInteger(0);
final int numOfFailedRetries = 3;
final OneShotLatch podCreated = new OneShotLatch();
final FlinkKubeClient flinkKubeClient =
createTestingFlinkKubeClientAllocatingPodsAfter(numOfFailedRetries, retries, podCreated);
final TestingKubernetesResourceManager testRM = createAndStartResourceManager(flinkConfig, flinkKubeClient);
registerSlotRequest(testRM);
podCreated.await();
// Creating taskmanager should retry 4 times (3 failed and then succeed)
assertThat(
"Creating taskmanager should fail " + numOfFailedRetries + " times and then succeed",
retries.get(),
is(numOfFailedRetries + 1));
testRM.close();
flinkKubeClient.close();
}
private TestingKubernetesResourceManager createAndStartResourceManager(
Configuration configuration,
FlinkKubeClient flinkKubeClient) throws Exception {
final TestingRpcService rpcService = new TestingRpcService(configuration);
final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(rpcService, TIMEOUT);
......@@ -339,14 +369,15 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
rmServices.jobLeaderIdService,
new ClusterInformation("localhost", 1234),
testingFatalErrorHandler,
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup()
);
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
flinkKubeClient,
new KubernetesResourceManagerConfiguration(CLUSTER_ID, TESTING_POD_CREATION_RETRY_INTERVAL));
kubernetesResourceManager.start();
rmServices.grantLeadership();
return kubernetesResourceManager;
}
private void registerSlotRequest() throws Exception {
private void registerSlotRequest(TestingKubernetesResourceManager resourceManager) throws Exception {
CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
resourceManager.getSlotManager().registerSlotRequest(
new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, JOB_MANAGER_HOST));
......@@ -355,6 +386,10 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
registerSlotRequestFuture.get();
}
private void registerSlotRequest() throws Exception {
registerSlotRequest(resourceManager);
}
private void registerTaskExecutor(ResourceID resourceID) throws Exception {
final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.createTestingTaskExecutorGateway();
......@@ -398,4 +433,21 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
.build())
.build());
}
private FlinkKubeClient createTestingFlinkKubeClientAllocatingPodsAfter(
int numberOfRetries,
AtomicInteger retries,
OneShotLatch podCreated) {
ExecutorService kubeClientExecutorService = Executors.newDirectExecutorService();
return new Fabric8FlinkKubeClient(flinkConfig, kubeClient, () -> kubeClientExecutorService) {
@Override
public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {
if (retries.getAndIncrement() < numberOfRetries) {
return FutureUtils.completedExceptionally(new RuntimeException("Exception"));
}
podCreated.trigger();
return super.createTaskManagerPod(kubernetesPod);
}
};
}
}
......@@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
......@@ -84,7 +85,7 @@ public class KubernetesTestBase extends TestLogger {
TestBaseUtils.setEnv(map);
kubeClient = server.getClient().inNamespace(NAMESPACE);
flinkKubeClient = new Fabric8FlinkKubeClient(flinkConfig, kubeClient);
flinkKubeClient = new Fabric8FlinkKubeClient(flinkConfig, kubeClient, Executors::newDirectExecutorService);
}
@After
......
......@@ -42,6 +42,8 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
......@@ -62,6 +64,8 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
private static final String SERVICE_ACCOUNT_NAME = "service-test";
private static final String TASKMANAGER_POD_NAME = "mock-task-manager-pod";
private static final String ENTRY_POINT_CLASS = KubernetesSessionClusterEntrypoint.class.getCanonicalName();
private KubernetesJobManagerSpecification kubernetesJobManagerSpecification;
......@@ -140,10 +144,10 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
.editOrNewSpec()
.endSpec()
.build());
this.flinkKubeClient.createTaskManagerPod(kubernetesPod);
this.flinkKubeClient.createTaskManagerPod(kubernetesPod).get();
final Pod resultTaskManagerPod =
this.kubeClient.pods().inNamespace(NAMESPACE).withName("mock-task-manager-pod").get();
this.kubeClient.pods().inNamespace(NAMESPACE).withName(TASKMANAGER_POD_NAME).get();
assertEquals(
this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().get(0).getMetadata().getUid(),
......@@ -151,7 +155,7 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
}
@Test
public void testStopPod() {
public void testStopPod() throws ExecutionException, InterruptedException {
final String podName = "pod-for-delete";
final Pod pod = new PodBuilder()
.editOrNewMetadata()
......@@ -164,7 +168,7 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
this.kubeClient.pods().inNamespace(NAMESPACE).create(pod);
assertNotNull(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get());
this.flinkKubeClient.stopPod(podName);
this.flinkKubeClient.stopPod(podName).get();
assertNull(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get());
}
......@@ -173,42 +177,45 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
final String hostName = "test-host-name";
mockExpectedServiceFromServerSide(buildExternalServiceWithLoadBalancer(hostName, ""));
final Endpoint resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
final Optional<Endpoint> resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertEquals(hostName, resultEndpoint.getAddress());
assertEquals(REST_PORT, resultEndpoint.getPort());
assertThat(resultEndpoint.isPresent(), is(true));
assertThat(resultEndpoint.get().getAddress(), is(hostName));
assertThat(resultEndpoint.get().getPort(), is(REST_PORT));
}
@Test
public void testServiceLoadBalancerEmptyHostAndIP() {
mockExpectedServiceFromServerSide(buildExternalServiceWithLoadBalancer("", ""));
final Endpoint resultEndpoint1 = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertNull(resultEndpoint1);
final Optional<Endpoint> resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertThat(resultEndpoint.isPresent(), is(false));
}
@Test
public void testServiceLoadBalancerNullHostAndIP() {
mockExpectedServiceFromServerSide(buildExternalServiceWithLoadBalancer(null, null));
final Endpoint resultEndpoint2 = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertNull(resultEndpoint2);
final Optional<Endpoint> resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertThat(resultEndpoint.isPresent(), is(false));
}
@Test
public void testNodePortService() {
mockExpectedServiceFromServerSide(buildExternalServiceWithNodePort());
final Endpoint resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertThat(resultEndpoint.getPort(), is(NODE_PORT));
final Optional<Endpoint> resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertThat(resultEndpoint.isPresent(), is(true));
assertThat(resultEndpoint.get().getPort(), is(NODE_PORT));
}
@Test
public void testClusterIPService() {
mockExpectedServiceFromServerSide(buildExternalServiceWithClusterIP());
final Endpoint resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertThat(resultEndpoint.getPort(), is(REST_PORT));
final Optional<Endpoint> resultEndpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID);
assertThat(resultEndpoint.isPresent(), is(true));
assertThat(resultEndpoint.get().getPort(), is(REST_PORT));
}
@Test
......@@ -217,12 +224,12 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
final KubernetesPod kubernetesPod = new KubernetesPod(new PodBuilder()
.editOrNewMetadata()
.withName("mock-task-manager-pod")
.withName(TASKMANAGER_POD_NAME)
.endMetadata()
.editOrNewSpec()
.endSpec()
.build());
this.flinkKubeClient.createTaskManagerPod(kubernetesPod);
this.flinkKubeClient.createTaskManagerPod(kubernetesPod).get();
assertEquals(1, this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().size());
assertEquals(1, this.kubeClient.configMaps().inNamespace(NAMESPACE).list().getItems().size());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册