未验证 提交 cb84e691 编写于 作者: W wangyang0918 提交者: Aljoscha Krettek

[FLINK-15251][kubernetes] Use hostname to create end point when ip in load balancer is null

上级 bd11e054
......@@ -191,19 +191,23 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
}
Service service = restService.getInternalResource();
String address;
String address = null;
if (service.getStatus() != null && (service.getStatus().getLoadBalancer() != null ||
service.getStatus().getLoadBalancer().getIngress() != null)) {
if (service.getStatus().getLoadBalancer().getIngress().size() > 0) {
address = service.getStatus().getLoadBalancer().getIngress().get(0).getIp();
if (address == null || address.isEmpty()) {
address = service.getStatus().getLoadBalancer().getIngress().get(0).getHostname();
}
} else {
address = this.internalClient.getMasterUrl().getHost();
restPort = getServiceNodePort(service, RestOptions.PORT);
}
} else if (service.getSpec().getExternalIPs() != null && service.getSpec().getExternalIPs().size() > 0) {
address = service.getSpec().getExternalIPs().get(0);
} else {
}
if (address == null || address.isEmpty()) {
return null;
}
return new Endpoint(address, restPort);
......
......@@ -46,6 +46,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
......@@ -76,6 +78,8 @@ public class KubernetesTestBase extends TestLogger {
protected static final String MOCK_SERVICE_ID = "mock-uuid-of-service";
protected static final String MOCK_SERVICE_HOST_NAME = "mock-host-name-of-service";
protected static final String MOCK_SERVICE_IP = "192.168.0.1";
@Before
......@@ -96,8 +100,8 @@ public class KubernetesTestBase extends TestLogger {
// Set mock requests.
mockInternalServiceActionWatch();
mockRestServiceActionWatcher();
mockGetRestService();
mockRestServiceActionWatcher(CLUSTER_ID);
mockGetRestService(CLUSTER_ID, MOCK_SERVICE_HOST_NAME, MOCK_SERVICE_IP);
}
protected FlinkKubeClient getFabric8FlinkKubeClient(){
......@@ -112,8 +116,8 @@ public class KubernetesTestBase extends TestLogger {
return server.getClient().inNamespace(NAMESPACE);
}
private void mockRestServiceActionWatcher() {
String serviceName = CLUSTER_ID + Constants.FLINK_REST_SERVICE_SUFFIX;
protected void mockRestServiceActionWatcher(String clusterId) {
String serviceName = clusterId + Constants.FLINK_REST_SERVICE_SUFFIX;
String path = String.format("/api/v1/namespaces/%s/services?fieldSelector=metadata.name%%3D%s&watch=true",
NAMESPACE, serviceName);
......@@ -122,7 +126,7 @@ public class KubernetesTestBase extends TestLogger {
.andUpgradeToWebSocket()
.open()
.waitFor(1000)
.andEmit(new WatchEvent(getMockRestService(), "ADDED"))
.andEmit(new WatchEvent(getMockRestService(MOCK_SERVICE_HOST_NAME, MOCK_SERVICE_IP), "ADDED"))
.done()
.once();
}
......@@ -141,17 +145,17 @@ public class KubernetesTestBase extends TestLogger {
.once();
}
private void mockGetRestService() {
String serviceName = CLUSTER_ID + Constants.FLINK_REST_SERVICE_SUFFIX;
protected void mockGetRestService(String clusterId, @Nullable String hostname, @Nullable String ip) {
String serviceName = clusterId + Constants.FLINK_REST_SERVICE_SUFFIX;
String path = String.format("/api/v1/namespaces/%s/services/%s", NAMESPACE, serviceName);
server.expect()
.withPath(path)
.andReturn(200, getMockRestService())
.andReturn(200, getMockRestService(hostname, ip))
.always();
}
private Service getMockRestService() {
private Service getMockRestService(@Nullable String hostname, @Nullable String ip) {
List<Decorator<Service, KubernetesService>> restServiceDecorators = new ArrayList<>();
restServiceDecorators.add(new InitializerDecorator<>(CLUSTER_ID + Constants.FLINK_REST_SERVICE_SUFFIX));
String exposedType = FLINK_CONFIG.getString(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
......@@ -165,10 +169,9 @@ public class KubernetesTestBase extends TestLogger {
}
Service service = kubernetesService.getInternalResource();
String mockServiceHostName = "mock-host-name-of-service";
service.setStatus(new ServiceStatusBuilder()
.withLoadBalancer(new LoadBalancerStatus(Collections.singletonList(
new LoadBalancerIngress(mockServiceHostName, MOCK_SERVICE_IP)))).build());
new LoadBalancerIngress(hostname, ip)))).build());
return service;
}
......@@ -192,5 +195,4 @@ public class KubernetesTestBase extends TestLogger {
labels.put(Constants.LABEL_APP_KEY, CLUSTER_ID);
return labels;
}
}
......@@ -47,6 +47,8 @@ import java.util.stream.Collectors;
import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
/**
......@@ -245,4 +247,36 @@ public class Fabric8ClientTest extends KubernetesTestBase {
flinkKubeClient.stopPod(podName);
assertEquals(0, kubeClient.pods().list().getItems().size());
}
@Test
public void testServiceLoadBalancerWithNoIP() throws Exception {
final String hostName = "test-host-name";
final Endpoint endpoint = getRestEndpoint(hostName, "");
assertEquals(hostName, endpoint.getAddress());
assertEquals(8081, endpoint.getPort());
}
@Test
public void testServiceLoadBalancerEmptyHostAndIP() throws Exception {
final Endpoint endpoint1 = getRestEndpoint("", "");
assertNull(endpoint1);
final Endpoint endpoint2 = getRestEndpoint(null, null);
assertNull(endpoint2);
}
private Endpoint getRestEndpoint(String hostName, String ip) throws Exception {
final String clusterId = "flink-on-k8s-cluster-test";
mockRestServiceActionWatcher(clusterId);
mockGetRestService(clusterId, hostName, ip);
flinkKubeClient.createRestService(clusterId).get();
final Service services = kubeClient.services()
.withName(clusterId + Constants.FLINK_REST_SERVICE_SUFFIX)
.get();
assertNotNull(services);
return flinkKubeClient.getRestEndpoint(clusterId);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册