提交 20d0990f 编写于 作者: F felixzheng 提交者: tison

[FLINK-16194][k8s] Refactor and simplify KubernetesTestBase

- make KubernetesTestBase contains only the common logic of the client-side and cluster-side
- remove some unnecessary methods of getting kube client and mocking Service's ADD event
上级 0743b437
......@@ -25,15 +25,12 @@ import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.junit.Test;
......@@ -50,6 +47,8 @@ import static org.junit.Assert.assertTrue;
*/
public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
private static final String MOCK_SERVICE_IP = "192.168.0.1";
private final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.createClusterSpecification();
......@@ -57,15 +56,9 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
public void testDeploySessionCluster() throws Exception {
final ClusterClient<String> clusterClient = deploySessionCluster();
// Check updated flink config options
assertEquals(String.valueOf(Constants.BLOB_SERVER_PORT), FLINK_CONFIG.getString(BlobServerOptions.PORT));
assertEquals(String.valueOf(Constants.TASK_MANAGER_RPC_PORT), FLINK_CONFIG.getString(TaskManagerOptions.RPC_PORT));
assertEquals(CLUSTER_ID + "." + NAMESPACE, FLINK_CONFIG.getString(JobManagerOptions.ADDRESS));
assertEquals(MOCK_SERVICE_ID, FLINK_CONFIG.getString(KubernetesConfigOptionsInternal.SERVICE_ID));
final KubernetesClient kubeClient = server.getClient();
final ServiceList serviceList = kubeClient.services().list();
assertEquals(2, serviceList.getItems().size());
assertEquals(CLUSTER_ID, serviceList.getItems().get(0).getMetadata().getName());
assertEquals(String.valueOf(Constants.BLOB_SERVER_PORT), flinkConfig.getString(BlobServerOptions.PORT));
assertEquals(String.valueOf(Constants.TASK_MANAGER_RPC_PORT), flinkConfig.getString(TaskManagerOptions.RPC_PORT));
assertEquals(CLUSTER_ID + "." + NAMESPACE, flinkConfig.getString(JobManagerOptions.ADDRESS));
final Deployment jmDeployment = kubeClient
.apps()
......@@ -93,7 +86,7 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
@Test
public void testDeployHighAvailabilitySessionCluster() throws ClusterDeploymentException {
FLINK_CONFIG.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.toString());
flinkConfig.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.toString());
final ClusterClient<String> clusterClient = deploySessionCluster();
......@@ -121,8 +114,7 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
@Test
public void testKillCluster() throws Exception {
final FlinkKubeClient flinkKubeClient = getFabric8FlinkKubeClient();
final KubernetesClusterDescriptor descriptor = new KubernetesClusterDescriptor(FLINK_CONFIG, flinkKubeClient);
final KubernetesClusterDescriptor descriptor = new KubernetesClusterDescriptor(flinkConfig, flinkKubeClient);
final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.createClusterSpecification();
......@@ -137,14 +129,10 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
// Mock kubernetes server do not delete the rest service by gc, so the rest service still exist.
final List<Service> services = kubeClient.services().list().getItems();
assertEquals(1, services.size());
assertEquals(
MOCK_SERVICE_ID,
services.get(0).getMetadata().getOwnerReferences().get(0).getUid());
}
private ClusterClient<String> deploySessionCluster() throws ClusterDeploymentException {
final FlinkKubeClient flinkKubeClient = getFabric8FlinkKubeClient();
final KubernetesClusterDescriptor descriptor = new KubernetesClusterDescriptor(FLINK_CONFIG, flinkKubeClient);
final KubernetesClusterDescriptor descriptor = new KubernetesClusterDescriptor(flinkConfig, flinkKubeClient);
final ClusterClient<String> clusterClient = descriptor
.deploySessionCluster(clusterSpecification)
......
......@@ -63,7 +63,8 @@ import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodStatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
......@@ -90,25 +91,30 @@ import static org.junit.Assert.assertTrue;
public class KubernetesResourceManagerTest extends KubernetesTestBase {
private static final Time TIMEOUT = Time.seconds(10L);
private static final String JOB_MANAGER_HOST = "jm-host1";
private TestingFatalErrorHandler testingFatalErrorHandler;
private final String jobManagerHost = "jm-host1";
private Configuration flinkConfig;
private TestingKubernetesResourceManager resourceManager;
private FlinkKubeClient flinkKubeClient;
@Before
public void setup() throws Exception {
testingFatalErrorHandler = new TestingFatalErrorHandler();
flinkConfig = new Configuration(FLINK_CONFIG);
super.setup();
flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
testingFatalErrorHandler = new TestingFatalErrorHandler();
flinkKubeClient = getFabric8FlinkKubeClient();
resourceManager = createAndStartResourceManager(flinkConfig);
final Deployment mockDeployment = new DeploymentBuilder()
.editOrNewMetadata()
.withName(CLUSTER_ID)
.withUid(CLUSTER_ID)
.endMetadata()
.build();
kubeClient.apps().deployments().inNamespace(NAMESPACE).create(mockDeployment);
}
@After
......@@ -178,8 +184,7 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
public void testStartAndStopWorker() throws Exception {
registerSlotRequest();
final KubernetesClient client = getKubeClient();
final PodList list = client.pods().list();
final PodList list = kubeClient.pods().list();
assertEquals(1, list.getItems().size());
final Pod pod = list.getItems().get(0);
......@@ -215,7 +220,7 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
return null;
});
unregisterAndReleaseFuture.get();
assertEquals(0, client.pods().list().getItems().size());
assertEquals(0, kubeClient.pods().list().getItems().size());
assertEquals(0, resourceManager.getWorkerNodes().size());
}
......@@ -223,38 +228,37 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
public void testTaskManagerPodTerminated() throws Exception {
registerSlotRequest();
final KubernetesClient client = getKubeClient();
final Pod pod1 = client.pods().list().getItems().get(0);
final Pod pod1 = kubeClient.pods().list().getItems().get(0);
final String taskManagerPrefix = CLUSTER_ID + "-taskmanager-1-";
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(flinkConfig, pod1)));
// General modification event
resourceManager.onModified(Collections.singletonList(new KubernetesPod(flinkConfig, pod1)));
assertEquals(1, client.pods().list().getItems().size());
assertEquals(taskManagerPrefix + 1, client.pods().list().getItems().get(0).getMetadata().getName());
assertEquals(1, kubeClient.pods().list().getItems().size());
assertEquals(taskManagerPrefix + 1, kubeClient.pods().list().getItems().get(0).getMetadata().getName());
// Terminate the pod.
terminatePod(pod1);
resourceManager.onModified(Collections.singletonList(new KubernetesPod(flinkConfig, pod1)));
// Old pod should be deleted and a new task manager should be created
assertEquals(1, client.pods().list().getItems().size());
final Pod pod2 = client.pods().list().getItems().get(0);
assertEquals(1, kubeClient.pods().list().getItems().size());
final Pod pod2 = kubeClient.pods().list().getItems().get(0);
assertEquals(taskManagerPrefix + 2, pod2.getMetadata().getName());
// Error happens in the pod.
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(flinkConfig, pod2)));
terminatePod(pod2);
resourceManager.onError(Collections.singletonList(new KubernetesPod(flinkConfig, pod2)));
final Pod pod3 = client.pods().list().getItems().get(0);
final Pod pod3 = kubeClient.pods().list().getItems().get(0);
assertEquals(taskManagerPrefix + 3, pod3.getMetadata().getName());
// Delete the pod.
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(flinkConfig, pod3)));
terminatePod(pod3);
resourceManager.onDeleted(Collections.singletonList(new KubernetesPod(flinkConfig, pod3)));
assertEquals(taskManagerPrefix + 4, client.pods().list().getItems().get(0).getMetadata().getName());
assertEquals(taskManagerPrefix + 4, kubeClient.pods().list().getItems().get(0).getMetadata().getName());
}
@Test
......@@ -267,8 +271,7 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
1024,
1,
new HashMap<>()));
final KubernetesClient client = getKubeClient();
assertEquals(1, client.pods().list().getItems().size());
assertEquals(1, kubeClient.pods().list().getItems().size());
// Call initialize method to recover worker nodes from previous attempt.
resourceManager.initialize();
......@@ -277,12 +280,12 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
// Register the previous taskmanager, no new pod should be created
registerTaskExecutor(new ResourceID(previewPodName));
registerSlotRequest();
assertEquals(1, client.pods().list().getItems().size());
assertEquals(1, kubeClient.pods().list().getItems().size());
// Register a new slot request, a new taskmanger pod will be created with attempt2
registerSlotRequest();
assertEquals(2, client.pods().list().getItems().size());
assertThat(client.pods().list().getItems().stream()
assertEquals(2, kubeClient.pods().list().getItems().size());
assertThat(kubeClient.pods().list().getItems().stream()
.map(e -> e.getMetadata().getName())
.collect(Collectors.toList()),
Matchers.containsInAnyOrder(taskManagerPrefix + "1-1", taskManagerPrefix + "2-1"));
......@@ -341,7 +344,7 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
private void registerSlotRequest() throws Exception {
CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> {
resourceManager.getSlotManager().registerSlotRequest(
new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, jobManagerHost));
new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, JOB_MANAGER_HOST));
return null;
});
registerSlotRequestFuture.get();
......
......@@ -18,13 +18,9 @@
package org.apache.flink.kubernetes;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.utils.Constants;
......@@ -46,6 +42,12 @@ import java.util.Map;
* Base test class for Kubernetes.
*/
public class KubernetesTestBase extends TestLogger {
protected static final String NAMESPACE = "test";
protected static final String CLUSTER_ID = "my-flink-cluster1";
protected static final String CONTAINER_IMAGE = "flink-k8s-test:latest";
protected static final String CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent";
@Rule
public MixedKubernetesServer server = new MixedKubernetesServer(true, true);
......@@ -54,52 +56,32 @@ public class KubernetesTestBase extends TestLogger {
private File flinkConfDir;
protected static final String NAMESPACE = "test";
protected static final Configuration FLINK_CONFIG = new Configuration();
protected final Configuration flinkConfig = new Configuration();
protected static final String CLUSTER_ID = "my-flink-cluster1";
protected KubernetesClient kubeClient;
protected static final String CONTAINER_IMAGE = "flink-k8s-test:latest";
protected static final String MOCK_SERVICE_ID = "mock-uuid-of-service";
protected static final String MOCK_SERVICE_IP = "192.168.0.1";
protected static final String FLINK_MASTER_ENV_KEY = "LD_LIBRARY_PATH";
protected static final String FLINK_MASTER_ENV_VALUE = "/usr/lib/native";
protected FlinkKubeClient flinkKubeClient;
@Before
public void setUp() throws IOException {
FLINK_CONFIG.setString(KubernetesConfigOptions.NAMESPACE, NAMESPACE);
FLINK_CONFIG.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
FLINK_CONFIG.setString(KubernetesConfigOptions.CONTAINER_IMAGE, CONTAINER_IMAGE);
FLINK_CONFIG.setString(KubernetesConfigOptionsInternal.SERVICE_ID, MOCK_SERVICE_ID);
FLINK_CONFIG.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, "main-class");
FLINK_CONFIG.setString(BlobServerOptions.PORT, String.valueOf(Constants.BLOB_SERVER_PORT));
FLINK_CONFIG.setString(TaskManagerOptions.RPC_PORT, String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
FLINK_CONFIG.setString(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + FLINK_MASTER_ENV_KEY,
FLINK_MASTER_ENV_VALUE);
public void setup() throws Exception {
flinkConfig.setString(KubernetesConfigOptions.NAMESPACE, NAMESPACE);
flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
flinkConfig.setString(KubernetesConfigOptions.CONTAINER_IMAGE, CONTAINER_IMAGE);
flinkConfig.setString(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, CONTAINER_IMAGE_PULL_POLICY);
flinkConfDir = temporaryFolder.newFolder().getAbsoluteFile();
BootstrapTools.writeConfiguration(new Configuration(), new File(flinkConfDir, "flink-conf.yaml"));
writeFlinkConfiguration();
Map<String, String> map = new HashMap<>();
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDir.toString());
TestBaseUtils.setEnv(map);
}
protected FlinkKubeClient getFabric8FlinkKubeClient(){
return getFabric8FlinkKubeClient(FLINK_CONFIG);
}
protected FlinkKubeClient getFabric8FlinkKubeClient(Configuration flinkConfig){
return new Fabric8FlinkKubeClient(flinkConfig, server.getClient().inNamespace(NAMESPACE));
kubeClient = server.getClient().inNamespace(NAMESPACE);
flinkKubeClient = new Fabric8FlinkKubeClient(flinkConfig, kubeClient);
}
protected KubernetesClient getKubeClient() {
return server.getClient().inNamespace(NAMESPACE);
protected void writeFlinkConfiguration() throws IOException {
BootstrapTools.writeConfiguration(this.flinkConfig, new File(flinkConfDir, "flink-conf.yaml"));
}
protected Map<String, String> getCommonLabels() {
......
......@@ -20,26 +20,17 @@ package org.apache.flink.kubernetes.kubeclient;
import org.apache.flink.kubernetes.KubernetesTestBase;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
/**
* Tests for Fabric implementation of {@link FlinkKubeClient}.
*/
public class Fabric8FlinkKubeClientTest extends KubernetesTestBase {
private FlinkKubeClient flinkKubeClient;
private KubernetesClient kubeClient;
@Before
public void setUp() throws IOException {
super.setUp();
flinkKubeClient = getFabric8FlinkKubeClient();
kubeClient = getKubeClient();
public void setup() throws Exception {
super.setup();
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册