Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
f6ad9685
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
f6ad9685
编写于
2月 23, 2020
作者:
F
felixzheng
提交者:
tison
3月 05, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-16194][k8s] Rework FlinkKubeClient to use the new decorator pattern
This closes #11233 .
上级
10f53290
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
376 addition
and
272 deletion
+376
-272
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
.../apache/flink/kubernetes/KubernetesClusterDescriptor.java
+11
-5
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
...rg/apache/flink/kubernetes/KubernetesResourceManager.java
+12
-42
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptionsInternal.java
...rnetes/configuration/KubernetesConfigOptionsInternal.java
+0
-6
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
...e/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
+75
-8
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
...g/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
+7
-3
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/TaskManagerPodParameter.java
.../flink/kubernetes/kubeclient/TaskManagerPodParameter.java
+0
-71
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java
...etes/kubeclient/factory/KubernetesTaskManagerFactory.java
+1
-1
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/ActionWatcher.java
.../flink/kubernetes/kubeclient/resources/ActionWatcher.java
+0
-63
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
...va/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+0
-50
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
...che/flink/kubernetes/KubernetesClusterDescriptorTest.java
+48
-7
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
...pache/flink/kubernetes/KubernetesResourceManagerTest.java
+12
-9
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
...ink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
+209
-6
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
.../kubeclient/factory/KubernetesTaskManagerFactoryTest.java
+1
-1
未找到文件。
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
浏览文件 @
f6ad9685
...
@@ -36,6 +36,9 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal
...
@@ -36,6 +36,9 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal
import
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
;
import
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
;
import
org.apache.flink.kubernetes.kubeclient.Endpoint
;
import
org.apache.flink.kubernetes.kubeclient.Endpoint
;
import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
;
import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
;
import
org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification
;
import
org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory
;
import
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters
;
import
org.apache.flink.kubernetes.utils.Constants
;
import
org.apache.flink.kubernetes.utils.Constants
;
import
org.apache.flink.kubernetes.utils.KubernetesUtils
;
import
org.apache.flink.kubernetes.utils.KubernetesUtils
;
import
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
;
import
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
;
...
@@ -163,10 +166,6 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
...
@@ -163,10 +166,6 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
TaskManagerOptions
.
RPC_PORT
,
TaskManagerOptions
.
RPC_PORT
,
Constants
.
TASK_MANAGER_RPC_PORT
);
Constants
.
TASK_MANAGER_RPC_PORT
);
// Set jobmanager address to namespaced service name
final
String
nameSpace
=
flinkConfig
.
getString
(
KubernetesConfigOptions
.
NAMESPACE
);
flinkConfig
.
setString
(
JobManagerOptions
.
ADDRESS
,
clusterId
+
"."
+
nameSpace
);
if
(
HighAvailabilityMode
.
isHighAvailabilityModeActivated
(
flinkConfig
))
{
if
(
HighAvailabilityMode
.
isHighAvailabilityModeActivated
(
flinkConfig
))
{
flinkConfig
.
setString
(
HighAvailabilityOptions
.
HA_CLUSTER_ID
,
clusterId
);
flinkConfig
.
setString
(
HighAvailabilityOptions
.
HA_CLUSTER_ID
,
clusterId
);
KubernetesUtils
.
checkAndUpdatePortConfigOption
(
KubernetesUtils
.
checkAndUpdatePortConfigOption
(
...
@@ -176,7 +175,14 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
...
@@ -176,7 +175,14 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
}
}
try
{
try
{
// todo
final
KubernetesJobManagerParameters
kubernetesJobManagerParameters
=
new
KubernetesJobManagerParameters
(
flinkConfig
,
clusterSpecification
);
final
KubernetesJobManagerSpecification
kubernetesJobManagerSpec
=
KubernetesJobManagerFactory
.
createJobManagerComponent
(
kubernetesJobManagerParameters
);
client
.
createJobManagerComponent
(
kubernetesJobManagerSpec
);
return
createClusterClientProvider
(
clusterId
);
return
createClusterClientProvider
(
clusterId
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
client
.
handleException
(
e
);
client
.
handleException
(
e
);
...
...
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
浏览文件 @
f6ad9685
...
@@ -24,10 +24,9 @@ import org.apache.flink.configuration.GlobalConfiguration;
...
@@ -24,10 +24,9 @@ import org.apache.flink.configuration.GlobalConfiguration;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
;
import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
;
import
org.apache.flink.kubernetes.kubeclient.KubeClientFactory
;
import
org.apache.flink.kubernetes.kubeclient.KubeClientFactory
;
import
org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter
;
import
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory
;
import
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod
;
import
org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
;
import
org.apache.flink.kubernetes.utils.Constants
;
import
org.apache.flink.kubernetes.utils.KubernetesUtils
;
import
org.apache.flink.kubernetes.utils.KubernetesUtils
;
import
org.apache.flink.runtime.clusterframework.ApplicationStatus
;
import
org.apache.flink.runtime.clusterframework.ApplicationStatus
;
import
org.apache.flink.runtime.clusterframework.BootstrapTools
;
import
org.apache.flink.runtime.clusterframework.BootstrapTools
;
...
@@ -36,7 +35,6 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
...
@@ -36,7 +35,6 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import
org.apache.flink.runtime.clusterframework.types.ResourceID
;
import
org.apache.flink.runtime.clusterframework.types.ResourceID
;
import
org.apache.flink.runtime.clusterframework.types.ResourceProfile
;
import
org.apache.flink.runtime.clusterframework.types.ResourceProfile
;
import
org.apache.flink.runtime.entrypoint.ClusterInformation
;
import
org.apache.flink.runtime.entrypoint.ClusterInformation
;
import
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions
;
import
org.apache.flink.runtime.heartbeat.HeartbeatServices
;
import
org.apache.flink.runtime.heartbeat.HeartbeatServices
;
import
org.apache.flink.runtime.highavailability.HighAvailabilityServices
;
import
org.apache.flink.runtime.highavailability.HighAvailabilityServices
;
import
org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup
;
import
org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup
;
...
@@ -53,8 +51,6 @@ import org.slf4j.LoggerFactory;
...
@@ -53,8 +51,6 @@ import org.slf4j.LoggerFactory;
import
javax.annotation.Nullable
;
import
javax.annotation.Nullable
;
import
java.io.File
;
import
java.util.Arrays
;
import
java.util.Collection
;
import
java.util.Collection
;
import
java.util.Collections
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashMap
;
...
@@ -89,8 +85,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
...
@@ -89,8 +85,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
private
final
ContaineredTaskManagerParameters
taskManagerParameters
;
private
final
ContaineredTaskManagerParameters
taskManagerParameters
;
private
final
List
<
String
>
taskManagerStartCommand
;
/** The number of pods requested, but not yet granted. */
/** The number of pods requested, but not yet granted. */
private
int
numPendingPodRequests
=
0
;
private
int
numPendingPodRequests
=
0
;
...
@@ -126,8 +120,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
...
@@ -126,8 +120,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
this
.
taskManagerParameters
=
this
.
taskManagerParameters
=
ContaineredTaskManagerParameters
.
create
(
flinkConfig
,
taskExecutorProcessSpec
,
numSlotsPerTaskManager
);
ContaineredTaskManagerParameters
.
create
(
flinkConfig
,
taskExecutorProcessSpec
,
numSlotsPerTaskManager
);
this
.
taskManagerStartCommand
=
getTaskManagerStartCommand
();
}
}
@Override
@Override
...
@@ -259,19 +251,21 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
...
@@ -259,19 +251,21 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
currentMaxAttemptId
,
currentMaxAttemptId
,
++
currentMaxPodId
);
++
currentMaxPodId
);
final
HashMap
<
String
,
String
>
env
=
new
HashMap
<>();
final
String
dynamicProperties
=
env
.
put
(
Constants
.
ENV_FLINK_POD_NAME
,
podName
);
BootstrapTools
.
getDynamicPropertiesAsString
(
flinkClientConfig
,
flinkConfig
);
env
.
putAll
(
taskManagerParameters
.
taskManagerEnv
());
final
TaskManagerPodParameter
parameter
=
new
TaskManagerPodParameter
(
final
KubernetesTaskManagerParameters
kubernetesTaskManagerParameters
=
new
KubernetesTaskManagerParameters
(
flinkConfig
,
podName
,
podName
,
taskManagerStartCommand
,
defaultMemoryMB
,
defaultMemoryMB
,
defaultCpus
,
dynamicProperties
,
env
);
taskManagerParameters
);
final
KubernetesPod
taskManagerPod
=
KubernetesTaskManagerFactory
.
createTaskManagerComponent
(
kubernetesTaskManagerParameters
);
log
.
info
(
"TaskManager {} will be started with {}."
,
podName
,
taskExecutorProcessSpec
);
log
.
info
(
"TaskManager {} will be started with {}."
,
podName
,
taskExecutorProcessSpec
);
kubeClient
.
createTaskManagerPod
(
parameter
);
kubeClient
.
createTaskManagerPod
(
taskManagerPod
);
}
}
/**
/**
...
@@ -296,30 +290,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
...
@@ -296,30 +290,6 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW
}
}
}
}
private
List
<
String
>
getTaskManagerStartCommand
()
{
final
String
confDir
=
flinkConfig
.
getString
(
KubernetesConfigOptions
.
FLINK_CONF_DIR
);
final
boolean
hasLogback
=
new
File
(
confDir
,
Constants
.
CONFIG_FILE_LOGBACK_NAME
).
exists
();
final
boolean
hasLog4j
=
new
File
(
confDir
,
Constants
.
CONFIG_FILE_LOG4J_NAME
).
exists
();
final
String
logDir
=
flinkConfig
.
getString
(
KubernetesConfigOptions
.
FLINK_LOG_DIR
);
final
String
mainClassArgs
=
"--"
+
CommandLineOptions
.
CONFIG_DIR_OPTION
.
getLongOpt
()
+
" "
+
flinkConfig
.
getString
(
KubernetesConfigOptions
.
FLINK_CONF_DIR
)
+
" "
+
BootstrapTools
.
getDynamicPropertiesAsString
(
flinkClientConfig
,
flinkConfig
);
final
String
command
=
KubernetesUtils
.
getTaskManagerStartCommand
(
flinkConfig
,
taskManagerParameters
,
confDir
,
logDir
,
hasLogback
,
hasLog4j
,
KubernetesTaskExecutorRunner
.
class
.
getCanonicalName
(),
mainClassArgs
);
return
Arrays
.
asList
(
"/bin/bash"
,
"-c"
,
command
);
}
protected
FlinkKubeClient
createFlinkKubeClient
()
{
protected
FlinkKubeClient
createFlinkKubeClient
()
{
return
KubeClientFactory
.
fromConfiguration
(
flinkConfig
);
return
KubeClientFactory
.
fromConfiguration
(
flinkConfig
);
}
}
...
...
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptionsInternal.java
浏览文件 @
f6ad9685
...
@@ -28,12 +28,6 @@ import org.apache.flink.configuration.ConfigOptions;
...
@@ -28,12 +28,6 @@ import org.apache.flink.configuration.ConfigOptions;
@Internal
@Internal
public
class
KubernetesConfigOptionsInternal
{
public
class
KubernetesConfigOptionsInternal
{
public
static
final
ConfigOption
<
String
>
SERVICE_ID
=
ConfigOptions
.
key
(
"kubernetes.internal.service.id"
)
.
stringType
()
.
noDefaultValue
()
.
withDescription
(
"The service id will be set in configuration after created. It will be used for gc."
);
public
static
final
ConfigOption
<
String
>
ENTRY_POINT_CLASS
=
ConfigOptions
public
static
final
ConfigOption
<
String
>
ENTRY_POINT_CLASS
=
ConfigOptions
.
key
(
"kubernetes.internal.jobmanager.entrypoint.class"
)
.
key
(
"kubernetes.internal.jobmanager.entrypoint.class"
)
.
stringType
()
.
stringType
()
...
...
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
浏览文件 @
f6ad9685
...
@@ -24,11 +24,15 @@ import org.apache.flink.configuration.RestOptions;
...
@@ -24,11 +24,15 @@ import org.apache.flink.configuration.RestOptions;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesService
;
import
org.apache.flink.kubernetes.utils.
Constant
s
;
import
org.apache.flink.kubernetes.utils.
KubernetesUtil
s
;
import
io.fabric8.kubernetes.api.model.HasMetadata
;
import
io.fabric8.kubernetes.api.model.OwnerReference
;
import
io.fabric8.kubernetes.api.model.OwnerReferenceBuilder
;
import
io.fabric8.kubernetes.api.model.Pod
;
import
io.fabric8.kubernetes.api.model.Pod
;
import
io.fabric8.kubernetes.api.model.Service
;
import
io.fabric8.kubernetes.api.model.Service
;
import
io.fabric8.kubernetes.api.model.ServicePort
;
import
io.fabric8.kubernetes.api.model.ServicePort
;
import
io.fabric8.kubernetes.api.model.apps.Deployment
;
import
io.fabric8.kubernetes.client.KubernetesClient
;
import
io.fabric8.kubernetes.client.KubernetesClient
;
import
io.fabric8.kubernetes.client.KubernetesClientException
;
import
io.fabric8.kubernetes.client.KubernetesClientException
;
import
io.fabric8.kubernetes.client.Watcher
;
import
io.fabric8.kubernetes.client.Watcher
;
...
@@ -66,8 +70,52 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
...
@@ -66,8 +70,52 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
}
}
@Override
@Override
public
void
createTaskManagerPod
(
TaskManagerPodParameter
parameter
)
{
public
void
createJobManagerComponent
(
KubernetesJobManagerSpecification
kubernetesJMSpec
)
{
// todo
final
Deployment
deployment
=
kubernetesJMSpec
.
getDeployment
();
final
List
<
HasMetadata
>
accompanyingResources
=
kubernetesJMSpec
.
getAccompanyingResources
();
// create Deployment
LOG
.
debug
(
"Start to create deployment with spec {}"
,
deployment
.
getSpec
().
toString
());
final
Deployment
createdDeployment
=
this
.
internalClient
.
apps
()
.
deployments
()
.
inNamespace
(
this
.
nameSpace
)
.
create
(
deployment
);
// Note that we should use the uid of the created Deployment for the OwnerReference.
setOwnerReference
(
createdDeployment
,
accompanyingResources
);
this
.
internalClient
.
resourceList
(
accompanyingResources
)
.
inNamespace
(
this
.
nameSpace
)
.
createOrReplace
();
}
@Override
public
void
createTaskManagerPod
(
KubernetesPod
kubernetesPod
)
{
final
Deployment
masterDeployment
=
this
.
internalClient
.
apps
()
.
deployments
()
.
inNamespace
(
this
.
nameSpace
)
.
withName
(
KubernetesUtils
.
getDeploymentName
(
clusterId
))
.
get
();
if
(
masterDeployment
==
null
)
{
throw
new
RuntimeException
(
"Failed to find Deployment named "
+
clusterId
+
" in namespace "
+
this
.
nameSpace
);
}
// Note that we should use the uid of the master Deployment for the OwnerReference.
setOwnerReference
(
masterDeployment
,
Collections
.
singletonList
(
kubernetesPod
.
getInternalResource
()));
LOG
.
debug
(
"Start to create pod with metadata {}, spec {}"
,
kubernetesPod
.
getInternalResource
().
getMetadata
(),
kubernetesPod
.
getInternalResource
().
getSpec
());
this
.
internalClient
.
pods
()
.
inNamespace
(
this
.
nameSpace
)
.
create
(
kubernetesPod
.
getInternalResource
());
}
}
@Override
@Override
...
@@ -83,7 +131,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
...
@@ -83,7 +131,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
// Return the service.namespace directly when use ClusterIP.
// Return the service.namespace directly when use ClusterIP.
if
(
serviceExposedType
.
equals
(
KubernetesConfigOptions
.
ServiceExposedType
.
ClusterIP
.
toString
()))
{
if
(
serviceExposedType
.
equals
(
KubernetesConfigOptions
.
ServiceExposedType
.
ClusterIP
.
toString
()))
{
return
new
Endpoint
(
clusterId
+
"."
+
nameSpace
,
restPort
);
return
new
Endpoint
(
KubernetesUtils
.
getInternalServiceName
(
clusterId
)
+
"."
+
nameSpace
,
restPort
);
}
}
KubernetesService
restService
=
getRestService
(
clusterId
);
KubernetesService
restService
=
getRestService
(
clusterId
);
...
@@ -130,7 +178,13 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
...
@@ -130,7 +178,13 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
@Override
@Override
public
void
stopAndCleanupCluster
(
String
clusterId
)
{
public
void
stopAndCleanupCluster
(
String
clusterId
)
{
this
.
internalClient
.
services
().
inNamespace
(
this
.
nameSpace
).
withName
(
clusterId
).
cascading
(
true
).
delete
();
this
.
internalClient
.
apps
()
.
deployments
()
.
inNamespace
(
this
.
nameSpace
)
.
withName
(
KubernetesUtils
.
getDeploymentName
(
clusterId
))
.
cascading
(
true
)
.
delete
();
}
}
@Override
@Override
...
@@ -138,16 +192,16 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
...
@@ -138,16 +192,16 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
LOG
.
error
(
"A Kubernetes exception occurred."
,
e
);
LOG
.
error
(
"A Kubernetes exception occurred."
,
e
);
}
}
@Override
@Nullable
@Nullable
@Override
public
KubernetesService
getInternalService
(
String
clusterId
)
{
public
KubernetesService
getInternalService
(
String
clusterId
)
{
return
getService
(
clusterId
);
return
getService
(
KubernetesUtils
.
getInternalServiceName
(
clusterId
)
);
}
}
@Override
@Override
@Nullable
@Nullable
public
KubernetesService
getRestService
(
String
clusterId
)
{
public
KubernetesService
getRestService
(
String
clusterId
)
{
return
getService
(
clusterId
+
Constants
.
FLINK_REST_SERVICE_SUFFIX
);
return
getService
(
KubernetesUtils
.
getRestServiceName
(
clusterId
)
);
}
}
@Override
@Override
...
@@ -188,6 +242,19 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
...
@@ -188,6 +242,19 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
this
.
internalClient
.
close
();
this
.
internalClient
.
close
();
}
}
private
void
setOwnerReference
(
Deployment
deployment
,
List
<
HasMetadata
>
resources
)
{
final
OwnerReference
deploymentOwnerReference
=
new
OwnerReferenceBuilder
()
.
withName
(
deployment
.
getMetadata
().
getName
())
.
withApiVersion
(
deployment
.
getApiVersion
())
.
withUid
(
deployment
.
getMetadata
().
getUid
())
.
withKind
(
deployment
.
getKind
())
.
withController
(
true
)
.
withBlockOwnerDeletion
(
true
)
.
build
();
resources
.
forEach
(
resource
->
resource
.
getMetadata
().
setOwnerReferences
(
Collections
.
singletonList
(
deploymentOwnerReference
)));
}
private
KubernetesService
getService
(
String
serviceName
)
{
private
KubernetesService
getService
(
String
serviceName
)
{
final
Service
service
=
this
final
Service
service
=
this
.
internalClient
.
internalClient
...
...
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
浏览文件 @
f6ad9685
...
@@ -32,11 +32,15 @@ import java.util.Map;
...
@@ -32,11 +32,15 @@ import java.util.Map;
public
interface
FlinkKubeClient
extends
AutoCloseable
{
public
interface
FlinkKubeClient
extends
AutoCloseable
{
/**
/**
* Create t
ask manager pod
.
* Create t
he Master components, this can include the Deployment, the ConfigMap(s), and the Service(s)
.
*
*
* @param parameter {@link TaskManagerPodParameter} to create a taskmanager pod.
*/
*/
void
createTaskManagerPod
(
TaskManagerPodParameter
parameter
);
void
createJobManagerComponent
(
KubernetesJobManagerSpecification
kubernetesJMSpec
);
/**
* Create task manager pod.
*/
void
createTaskManagerPod
(
KubernetesPod
kubernetesPod
);
/**
/**
* Stop a specified pod by name.
* Stop a specified pod by name.
...
...
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/TaskManagerPodParameter.java
已删除
100644 → 0
浏览文件 @
10f53290
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.kubernetes.kubeclient
;
import
java.util.List
;
import
java.util.Map
;
/**
* Parameters to create a taskmanager pod.
*/
public
class
TaskManagerPodParameter
{
private
final
String
podName
;
private
final
List
<
String
>
args
;
private
final
int
taskManagerMemoryInMB
;
private
final
double
taskManagerCpus
;
private
final
Map
<
String
,
String
>
environmentVariables
;
public
TaskManagerPodParameter
(
String
podName
,
List
<
String
>
args
,
int
taskManagerMemoryInMB
,
double
taskManagerCpus
,
Map
<
String
,
String
>
environmentVariables
)
{
this
.
podName
=
podName
;
this
.
args
=
args
;
this
.
taskManagerMemoryInMB
=
taskManagerMemoryInMB
;
this
.
taskManagerCpus
=
taskManagerCpus
;
this
.
environmentVariables
=
environmentVariables
;
}
public
String
getPodName
()
{
return
podName
;
}
public
List
<
String
>
getArgs
()
{
return
args
;
}
public
Map
<
String
,
String
>
getEnvironmentVariables
()
{
return
environmentVariables
;
}
public
int
getTaskManagerMemoryInMB
()
{
return
taskManagerMemoryInMB
;
}
public
double
getTaskManagerCpus
()
{
return
taskManagerCpus
;
}
}
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java
浏览文件 @
f6ad9685
...
@@ -34,7 +34,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder;
...
@@ -34,7 +34,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder;
*/
*/
public
class
KubernetesTaskManagerFactory
{
public
class
KubernetesTaskManagerFactory
{
public
static
KubernetesPod
build
TaskManagerComponent
(
KubernetesTaskManagerParameters
kubernetesTaskManagerParameters
)
{
public
static
KubernetesPod
create
TaskManagerComponent
(
KubernetesTaskManagerParameters
kubernetesTaskManagerParameters
)
{
FlinkPod
flinkPod
=
new
FlinkPod
.
Builder
().
build
();
FlinkPod
flinkPod
=
new
FlinkPod
.
Builder
().
build
();
final
KubernetesStepDecorator
[]
stepDecorators
=
new
KubernetesStepDecorator
[]
{
final
KubernetesStepDecorator
[]
stepDecorators
=
new
KubernetesStepDecorator
[]
{
...
...
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/ActionWatcher.java
已删除
100644 → 0
浏览文件 @
10f53290
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.kubernetes.kubeclient.resources
;
import
io.fabric8.kubernetes.api.model.HasMetadata
;
import
io.fabric8.kubernetes.client.KubernetesClientException
;
import
io.fabric8.kubernetes.client.KubernetesClientTimeoutException
;
import
io.fabric8.kubernetes.client.Watcher
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicReference
;
/**
* Watch a specific action.
*/
public
class
ActionWatcher
<
T
extends
HasMetadata
>
implements
Watcher
<
T
>
{
private
final
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
private
final
AtomicReference
<
T
>
reference
=
new
AtomicReference
<>();
private
final
T
resource
;
private
final
Action
expectedAction
;
public
ActionWatcher
(
Action
expectedAction
,
T
resource
)
{
this
.
resource
=
resource
;
this
.
expectedAction
=
expectedAction
;
}
@Override
public
void
eventReceived
(
Action
action
,
T
resource
)
{
if
(
action
==
this
.
expectedAction
)
{
this
.
reference
.
set
(
resource
);
this
.
latch
.
countDown
();
}
}
@Override
public
void
onClose
(
KubernetesClientException
e
)
{
}
public
T
await
(
long
amount
,
TimeUnit
timeUnit
)
throws
InterruptedException
{
if
(
this
.
latch
.
await
(
amount
,
timeUnit
))
{
return
this
.
reference
.
get
();
}
else
{
throw
new
KubernetesClientTimeoutException
(
this
.
resource
,
amount
,
timeUnit
);
}
}
}
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
浏览文件 @
f6ad9685
...
@@ -23,18 +23,11 @@ import org.apache.flink.configuration.Configuration;
...
@@ -23,18 +23,11 @@ import org.apache.flink.configuration.Configuration;
import
org.apache.flink.configuration.CoreOptions
;
import
org.apache.flink.configuration.CoreOptions
;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.runtime.clusterframework.BootstrapTools
;
import
org.apache.flink.runtime.clusterframework.BootstrapTools
;
import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters
;
import
org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec
;
import
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils
;
import
org.apache.flink.util.FlinkRuntimeException
;
import
org.apache.flink.util.FlinkRuntimeException
;
import
io.fabric8.kubernetes.api.model.HasMetadata
;
import
io.fabric8.kubernetes.api.model.OwnerReference
;
import
io.fabric8.kubernetes.api.model.OwnerReferenceBuilder
;
import
io.fabric8.kubernetes.api.model.Quantity
;
import
io.fabric8.kubernetes.api.model.Quantity
;
import
io.fabric8.kubernetes.api.model.ResourceRequirements
;
import
io.fabric8.kubernetes.api.model.ResourceRequirements
;
import
io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder
;
import
io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder
;
import
io.fabric8.kubernetes.api.model.apps.Deployment
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
...
@@ -46,9 +39,7 @@ import java.io.FileInputStream;
...
@@ -46,9 +39,7 @@ import java.io.FileInputStream;
import
java.io.FileNotFoundException
;
import
java.io.FileNotFoundException
;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.io.InputStreamReader
;
import
java.io.InputStreamReader
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkNotNull
;
import
static
org
.
apache
.
flink
.
util
.
Preconditions
.
checkNotNull
;
...
@@ -121,47 +112,6 @@ public class KubernetesUtils {
...
@@ -121,47 +112,6 @@ public class KubernetesUtils {
}
}
}
}
/**
* Generates the shell command to start a task manager for kubernetes.
*
* @param flinkConfig The Flink configuration.
* @param tmParams Parameters for the task manager.
* @param configDirectory The configuration directory for the flink-conf.yaml
* @param logDirectory The log directory.
* @param hasLogback Uses logback?
* @param hasLog4j Uses log4j?
* @param mainClass The main class to start with.
* @param mainArgs The args for main class.
* @return A String containing the task manager startup command.
*/
public
static
String
getTaskManagerStartCommand
(
Configuration
flinkConfig
,
ContaineredTaskManagerParameters
tmParams
,
String
configDirectory
,
String
logDirectory
,
boolean
hasLogback
,
boolean
hasLog4j
,
String
mainClass
,
@Nullable
String
mainArgs
)
{
final
TaskExecutorProcessSpec
taskExecutorProcessSpec
=
tmParams
.
getTaskExecutorProcessSpec
();
final
String
jvmMemOpts
=
TaskExecutorProcessUtils
.
generateJvmParametersStr
(
taskExecutorProcessSpec
);
String
args
=
TaskExecutorProcessUtils
.
generateDynamicConfigsStr
(
taskExecutorProcessSpec
);
if
(
mainArgs
!=
null
)
{
args
+=
" "
+
mainArgs
;
}
return
getCommonStartCommand
(
flinkConfig
,
ClusterComponent
.
TASK_MANAGER
,
jvmMemOpts
,
configDirectory
,
logDirectory
,
hasLogback
,
hasLog4j
,
mainClass
,
args
);
}
/**
/**
* Generate name of the internal Service.
* Generate name of the internal Service.
*/
*/
...
...
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
浏览文件 @
f6ad9685
...
@@ -26,16 +26,22 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
...
@@ -26,16 +26,22 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import
org.apache.flink.configuration.JobManagerOptions
;
import
org.apache.flink.configuration.JobManagerOptions
;
import
org.apache.flink.configuration.TaskManagerOptions
;
import
org.apache.flink.configuration.TaskManagerOptions
;
import
org.apache.flink.kubernetes.utils.Constants
;
import
org.apache.flink.kubernetes.utils.Constants
;
import
org.apache.flink.kubernetes.utils.KubernetesUtils
;
import
org.apache.flink.runtime.jobmanager.HighAvailabilityMode
;
import
org.apache.flink.runtime.jobmanager.HighAvailabilityMode
;
import
io.fabric8.kubernetes.api.model.Container
;
import
io.fabric8.kubernetes.api.model.Container
;
import
io.fabric8.kubernetes.api.model.EnvVar
;
import
io.fabric8.kubernetes.api.model.EnvVar
;
import
io.fabric8.kubernetes.api.model.LoadBalancerIngress
;
import
io.fabric8.kubernetes.api.model.LoadBalancerStatus
;
import
io.fabric8.kubernetes.api.model.Service
;
import
io.fabric8.kubernetes.api.model.Service
;
import
io.fabric8.kubernetes.api.model.ServiceBuilder
;
import
io.fabric8.kubernetes.api.model.ServiceStatusBuilder
;
import
io.fabric8.kubernetes.api.model.apps.Deployment
;
import
io.fabric8.kubernetes.api.model.apps.Deployment
;
import
io.fabric8.kubernetes.client.KubernetesClient
;
import
io.fabric8.kubernetes.client.KubernetesClient
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.Test
;
import
java.util.
List
;
import
java.util.
Collections
;
import
java.util.stream.Collectors
;
import
java.util.stream.Collectors
;
import
static
org
.
apache
.
flink
.
kubernetes
.
utils
.
Constants
.
ENV_FLINK_POD_IP_ADDRESS
;
import
static
org
.
apache
.
flink
.
kubernetes
.
utils
.
Constants
.
ENV_FLINK_POD_IP_ADDRESS
;
...
@@ -46,19 +52,27 @@ import static org.junit.Assert.assertTrue;
...
@@ -46,19 +52,27 @@ import static org.junit.Assert.assertTrue;
* Tests for the {@link KubernetesClusterDescriptor}.
* Tests for the {@link KubernetesClusterDescriptor}.
*/
*/
public
class
KubernetesClusterDescriptorTest
extends
KubernetesTestBase
{
public
class
KubernetesClusterDescriptorTest
extends
KubernetesTestBase
{
private
static
final
String
MOCK_SERVICE_HOST_NAME
=
"mock-host-name-of-service"
;
private
static
final
String
MOCK_SERVICE_IP
=
"192.168.0.1"
;
private
static
final
String
MOCK_SERVICE_IP
=
"192.168.0.1"
;
private
final
ClusterSpecification
clusterSpecification
=
new
ClusterSpecification
.
ClusterSpecificationBuilder
()
private
final
ClusterSpecification
clusterSpecification
=
new
ClusterSpecification
.
ClusterSpecificationBuilder
()
.
createClusterSpecification
();
.
createClusterSpecification
();
@Before
public
void
setup
()
throws
Exception
{
super
.
setup
();
mockGetRestServiceWithLoadBalancer
(
MOCK_SERVICE_HOST_NAME
,
MOCK_SERVICE_IP
);
}
@Test
@Test
public
void
testDeploySessionCluster
()
throws
Exception
{
public
void
testDeploySessionCluster
()
throws
Exception
{
final
ClusterClient
<
String
>
clusterClient
=
deploySessionCluster
();
final
ClusterClient
<
String
>
clusterClient
=
deploySessionCluster
();
// Check updated flink config options
// Check updated flink config options
assertEquals
(
String
.
valueOf
(
Constants
.
BLOB_SERVER_PORT
),
flinkConfig
.
getString
(
BlobServerOptions
.
PORT
));
assertEquals
(
String
.
valueOf
(
Constants
.
BLOB_SERVER_PORT
),
flinkConfig
.
getString
(
BlobServerOptions
.
PORT
));
assertEquals
(
String
.
valueOf
(
Constants
.
TASK_MANAGER_RPC_PORT
),
flinkConfig
.
getString
(
TaskManagerOptions
.
RPC_PORT
));
assertEquals
(
String
.
valueOf
(
Constants
.
TASK_MANAGER_RPC_PORT
),
flinkConfig
.
getString
(
TaskManagerOptions
.
RPC_PORT
));
assertEquals
(
CLUSTER_ID
+
"."
+
NAMESPACE
,
flinkConfig
.
getString
(
JobManagerOptions
.
ADDRESS
));
assertEquals
(
KubernetesUtils
.
getInternalServiceName
(
CLUSTER_ID
)
+
"."
+
NAMESPACE
,
flinkConfig
.
getString
(
JobManagerOptions
.
ADDRESS
));
final
Deployment
jmDeployment
=
kubeClient
final
Deployment
jmDeployment
=
kubeClient
.
apps
()
.
apps
()
...
@@ -87,7 +101,6 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
...
@@ -87,7 +101,6 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
@Test
@Test
public
void
testDeployHighAvailabilitySessionCluster
()
throws
ClusterDeploymentException
{
public
void
testDeployHighAvailabilitySessionCluster
()
throws
ClusterDeploymentException
{
flinkConfig
.
setString
(
HighAvailabilityOptions
.
HA_MODE
,
HighAvailabilityMode
.
ZOOKEEPER
.
toString
());
flinkConfig
.
setString
(
HighAvailabilityOptions
.
HA_MODE
,
HighAvailabilityMode
.
ZOOKEEPER
.
toString
());
final
ClusterClient
<
String
>
clusterClient
=
deploySessionCluster
();
final
ClusterClient
<
String
>
clusterClient
=
deploySessionCluster
();
final
KubernetesClient
kubeClient
=
server
.
getClient
();
final
KubernetesClient
kubeClient
=
server
.
getClient
();
...
@@ -126,9 +139,10 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
...
@@ -126,9 +139,10 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
descriptor
.
killCluster
(
CLUSTER_ID
);
descriptor
.
killCluster
(
CLUSTER_ID
);
// Mock kubernetes server do not delete the rest service by gc, so the rest service still exist.
// Mock kubernetes server do not delete the accompanying resources by gc.
final
List
<
Service
>
services
=
kubeClient
.
services
().
list
().
getItems
();
assertTrue
(
kubeClient
.
apps
().
deployments
().
list
().
getItems
().
isEmpty
());
assertEquals
(
1
,
services
.
size
());
assertEquals
(
2
,
kubeClient
.
services
().
list
().
getItems
().
size
());
assertEquals
(
1
,
kubeClient
.
configMaps
().
list
().
getItems
().
size
());
}
}
private
ClusterClient
<
String
>
deploySessionCluster
()
throws
ClusterDeploymentException
{
private
ClusterClient
<
String
>
deploySessionCluster
()
throws
ClusterDeploymentException
{
...
@@ -144,4 +158,31 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
...
@@ -144,4 +158,31 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase {
return
clusterClient
;
return
clusterClient
;
}
}
private
void
mockGetRestServiceWithLoadBalancer
(
String
hostname
,
String
ip
)
{
final
String
restServiceName
=
KubernetesUtils
.
getRestServiceName
(
CLUSTER_ID
);
final
String
path
=
String
.
format
(
"/api/v1/namespaces/%s/services/%s"
,
NAMESPACE
,
restServiceName
);
server
.
expect
()
.
get
()
.
withPath
(
path
)
.
andReturn
(
200
,
buildMockRestService
(
hostname
,
ip
))
.
always
();
}
private
Service
buildMockRestService
(
String
hostname
,
String
ip
)
{
final
Service
service
=
new
ServiceBuilder
()
.
editOrNewMetadata
()
.
withName
(
KubernetesUtils
.
getRestServiceName
(
CLUSTER_ID
))
.
endMetadata
()
.
editOrNewSpec
()
.
endSpec
()
.
build
();
service
.
setStatus
(
new
ServiceStatusBuilder
()
.
withLoadBalancer
(
new
LoadBalancerStatus
(
Collections
.
singletonList
(
new
LoadBalancerIngress
(
hostname
,
ip
)))).
build
());
return
service
;
}
}
}
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
浏览文件 @
f6ad9685
...
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.MemorySize;
...
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.MemorySize;
import
org.apache.flink.configuration.TaskManagerOptions
;
import
org.apache.flink.configuration.TaskManagerOptions
;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
;
import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
;
import
org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod
;
import
org.apache.flink.kubernetes.utils.Constants
;
import
org.apache.flink.kubernetes.utils.Constants
;
import
org.apache.flink.kubernetes.utils.KubernetesUtils
;
import
org.apache.flink.kubernetes.utils.KubernetesUtils
;
...
@@ -62,6 +61,7 @@ import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
...
@@ -62,6 +61,7 @@ import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
import
io.fabric8.kubernetes.api.model.ContainerStatusBuilder
;
import
io.fabric8.kubernetes.api.model.ContainerStatusBuilder
;
import
io.fabric8.kubernetes.api.model.EnvVarBuilder
;
import
io.fabric8.kubernetes.api.model.EnvVarBuilder
;
import
io.fabric8.kubernetes.api.model.Pod
;
import
io.fabric8.kubernetes.api.model.Pod
;
import
io.fabric8.kubernetes.api.model.PodBuilder
;
import
io.fabric8.kubernetes.api.model.PodList
;
import
io.fabric8.kubernetes.api.model.PodList
;
import
io.fabric8.kubernetes.api.model.PodStatusBuilder
;
import
io.fabric8.kubernetes.api.model.PodStatusBuilder
;
import
io.fabric8.kubernetes.api.model.apps.Deployment
;
import
io.fabric8.kubernetes.api.model.apps.Deployment
;
...
@@ -72,9 +72,7 @@ import org.junit.Assert;
...
@@ -72,9 +72,7 @@ import org.junit.Assert;
import
org.junit.Before
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.Test
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.Map
;
import
java.util.Map
;
import
java.util.concurrent.Callable
;
import
java.util.concurrent.Callable
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.CompletableFuture
;
...
@@ -265,12 +263,17 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
...
@@ -265,12 +263,17 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase {
public
void
testGetWorkerNodesFromPreviousAttempts
()
throws
Exception
{
public
void
testGetWorkerNodesFromPreviousAttempts
()
throws
Exception
{
// Prepare pod of previous attempt
// Prepare pod of previous attempt
final
String
previewPodName
=
CLUSTER_ID
+
"-taskmanager-1-1"
;
final
String
previewPodName
=
CLUSTER_ID
+
"-taskmanager-1-1"
;
flinkKubeClient
.
createTaskManagerPod
(
new
TaskManagerPodParameter
(
previewPodName
,
final
Pod
mockTaskManagerPod
=
new
PodBuilder
()
new
ArrayList
<>(),
.
editOrNewMetadata
()
1024
,
.
withName
(
previewPodName
)
1
,
.
withLabels
(
KubernetesUtils
.
getTaskManagerLabels
(
CLUSTER_ID
))
new
HashMap
<>()));
.
endMetadata
()
.
editOrNewSpec
()
.
endSpec
()
.
build
();
flinkKubeClient
.
createTaskManagerPod
(
new
KubernetesPod
(
mockTaskManagerPod
));
assertEquals
(
1
,
kubeClient
.
pods
().
list
().
getItems
().
size
());
assertEquals
(
1
,
kubeClient
.
pods
().
list
().
getItems
().
size
());
// Call initialize method to recover worker nodes from previous attempt.
// Call initialize method to recover worker nodes from previous attempt.
...
...
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
浏览文件 @
f6ad9685
...
@@ -18,33 +18,236 @@
...
@@ -18,33 +18,236 @@
package
org.apache.flink.kubernetes.kubeclient
;
package
org.apache.flink.kubernetes.kubeclient
;
import
org.apache.flink.client.deployment.ClusterSpecification
;
import
org.apache.flink.configuration.BlobServerOptions
;
import
org.apache.flink.configuration.JobManagerOptions
;
import
org.apache.flink.configuration.RestOptions
;
import
org.apache.flink.kubernetes.KubernetesTestBase
;
import
org.apache.flink.kubernetes.KubernetesTestBase
;
import
org.apache.flink.kubernetes.KubernetesTestUtils
;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
;
import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal
;
import
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
;
import
org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory
;
import
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters
;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod
;
import
org.apache.flink.kubernetes.utils.KubernetesUtils
;
import
io.fabric8.kubernetes.api.model.ConfigMap
;
import
io.fabric8.kubernetes.api.model.HasMetadata
;
import
io.fabric8.kubernetes.api.model.LoadBalancerIngress
;
import
io.fabric8.kubernetes.api.model.LoadBalancerStatus
;
import
io.fabric8.kubernetes.api.model.OwnerReference
;
import
io.fabric8.kubernetes.api.model.Pod
;
import
io.fabric8.kubernetes.api.model.PodBuilder
;
import
io.fabric8.kubernetes.api.model.Service
;
import
io.fabric8.kubernetes.api.model.ServiceBuilder
;
import
io.fabric8.kubernetes.api.model.ServiceStatusBuilder
;
import
io.fabric8.kubernetes.api.model.apps.Deployment
;
import
org.junit.Before
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.Test
;
import
javax.annotation.Nullable
;
import
java.util.Collections
;
import
java.util.List
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
junit
.
Assert
.
assertNotNull
;
import
static
org
.
junit
.
Assert
.
assertNull
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
/**
/**
* Tests for Fabric implementation of {@link FlinkKubeClient}.
* Tests for Fabric implementation of {@link FlinkKubeClient}.
*/
*/
public
class
Fabric8FlinkKubeClientTest
extends
KubernetesTestBase
{
public
class
Fabric8FlinkKubeClientTest
extends
KubernetesTestBase
{
private
static
final
int
REST_PORT
=
9081
;
private
static
final
int
RPC_PORT
=
7123
;
private
static
final
int
BLOB_SERVER_PORT
=
8346
;
private
static
final
double
JOB_MANAGER_CPU
=
2.0
;
private
static
final
int
JOB_MANAGER_MEMORY
=
768
;
private
static
final
String
SERVICE_ACCOUNT_NAME
=
"service-test"
;
private
static
final
String
ENTRY_POINT_CLASS
=
KubernetesSessionClusterEntrypoint
.
class
.
getCanonicalName
();
private
KubernetesJobManagerSpecification
kubernetesJobManagerSpecification
;
@Before
@Before
public
void
setup
()
throws
Exception
{
public
void
setup
()
throws
Exception
{
super
.
setup
();
super
.
setup
();
KubernetesTestUtils
.
createTemporyFile
(
"some data"
,
flinkConfDir
,
"logback.xml"
);
KubernetesTestUtils
.
createTemporyFile
(
"some data"
,
flinkConfDir
,
"log4j.properties"
);
flinkConfig
.
set
(
KubernetesConfigOptions
.
CONTAINER_IMAGE_PULL_POLICY
,
CONTAINER_IMAGE_PULL_POLICY
);
flinkConfig
.
set
(
KubernetesConfigOptionsInternal
.
ENTRY_POINT_CLASS
,
ENTRY_POINT_CLASS
);
flinkConfig
.
set
(
RestOptions
.
PORT
,
REST_PORT
);
flinkConfig
.
set
(
JobManagerOptions
.
PORT
,
RPC_PORT
);
flinkConfig
.
set
(
BlobServerOptions
.
PORT
,
Integer
.
toString
(
BLOB_SERVER_PORT
));
flinkConfig
.
set
(
KubernetesConfigOptions
.
JOB_MANAGER_CPU
,
JOB_MANAGER_CPU
);
flinkConfig
.
set
(
KubernetesConfigOptions
.
JOB_MANAGER_SERVICE_ACCOUNT
,
SERVICE_ACCOUNT_NAME
);
final
ClusterSpecification
clusterSpecification
=
new
ClusterSpecification
.
ClusterSpecificationBuilder
()
.
setMasterMemoryMB
(
JOB_MANAGER_MEMORY
)
.
setTaskManagerMemoryMB
(
1000
)
.
setSlotsPerTaskManager
(
3
)
.
createClusterSpecification
();
final
KubernetesJobManagerParameters
kubernetesJobManagerParameters
=
new
KubernetesJobManagerParameters
(
flinkConfig
,
clusterSpecification
);
this
.
kubernetesJobManagerSpecification
=
KubernetesJobManagerFactory
.
createJobManagerComponent
(
kubernetesJobManagerParameters
);
}
}
@Test
@Test
public
void
testCreateTaskManagerPod
()
{
public
void
testCreateFlinkMasterComponent
()
throws
Exception
{
// todo
flinkKubeClient
.
createJobManagerComponent
(
this
.
kubernetesJobManagerSpecification
);
final
List
<
Deployment
>
resultedDeployments
=
kubeClient
.
apps
().
deployments
()
.
inNamespace
(
NAMESPACE
)
.
list
()
.
getItems
();
assertEquals
(
1
,
resultedDeployments
.
size
());
final
List
<
ConfigMap
>
resultedConfigMaps
=
kubeClient
.
configMaps
()
.
inNamespace
(
NAMESPACE
)
.
list
()
.
getItems
();
assertEquals
(
1
,
resultedConfigMaps
.
size
());
final
List
<
Service
>
resultedServices
=
kubeClient
.
services
()
.
inNamespace
(
NAMESPACE
)
.
list
()
.
getItems
();
assertEquals
(
2
,
resultedServices
.
size
());
testOwnerReferenceSetting
(
resultedDeployments
.
get
(
0
),
resultedConfigMaps
);
testOwnerReferenceSetting
(
resultedDeployments
.
get
(
0
),
resultedServices
);
}
private
<
T
extends
HasMetadata
>
void
testOwnerReferenceSetting
(
HasMetadata
ownerReference
,
List
<
T
>
resources
)
{
resources
.
forEach
(
resource
->
{
List
<
OwnerReference
>
ownerReferences
=
resource
.
getMetadata
().
getOwnerReferences
();
assertEquals
(
1
,
ownerReferences
.
size
());
assertEquals
(
ownerReference
.
getMetadata
().
getUid
(),
ownerReferences
.
get
(
0
).
getUid
());
});
}
@Test
public
void
testCreateFlinkTaskManagerPod
()
throws
Exception
{
this
.
flinkKubeClient
.
createJobManagerComponent
(
this
.
kubernetesJobManagerSpecification
);
final
KubernetesPod
kubernetesPod
=
new
KubernetesPod
(
new
PodBuilder
()
.
editOrNewMetadata
()
.
withName
(
"mock-task-manager-pod"
)
.
endMetadata
()
.
editOrNewSpec
()
.
endSpec
()
.
build
());
this
.
flinkKubeClient
.
createTaskManagerPod
(
kubernetesPod
);
final
Pod
resultTaskManagerPod
=
this
.
kubeClient
.
pods
().
inNamespace
(
NAMESPACE
).
withName
(
"mock-task-manager-pod"
).
get
();
assertEquals
(
this
.
kubeClient
.
apps
().
deployments
().
inNamespace
(
NAMESPACE
).
list
().
getItems
().
get
(
0
).
getMetadata
().
getUid
(),
resultTaskManagerPod
.
getMetadata
().
getOwnerReferences
().
get
(
0
).
getUid
());
}
}
@Test
@Test
public
void
testServiceLoadBalancerWithNoIP
()
throws
Exception
{
public
void
testStopPod
()
{
// todo
final
String
podName
=
"pod-for-delete"
;
final
Pod
pod
=
new
PodBuilder
()
.
editOrNewMetadata
()
.
withName
(
podName
)
.
endMetadata
()
.
editOrNewSpec
()
.
endSpec
()
.
build
();
this
.
kubeClient
.
pods
().
inNamespace
(
NAMESPACE
).
create
(
pod
);
assertNotNull
(
this
.
kubeClient
.
pods
().
inNamespace
(
NAMESPACE
).
withName
(
podName
).
get
());
this
.
flinkKubeClient
.
stopPod
(
podName
);
assertNull
(
this
.
kubeClient
.
pods
().
inNamespace
(
NAMESPACE
).
withName
(
podName
).
get
());
}
}
@Test
@Test
public
void
testServiceLoadBalancerEmptyHostAndIP
()
throws
Exception
{
public
void
testServiceLoadBalancerWithNoIP
()
{
// todo
final
String
hostName
=
"test-host-name"
;
mockRestServiceWithLB
(
hostName
,
""
);
final
Endpoint
resultEndpoint
=
flinkKubeClient
.
getRestEndpoint
(
CLUSTER_ID
);
assertEquals
(
hostName
,
resultEndpoint
.
getAddress
());
assertEquals
(
REST_PORT
,
resultEndpoint
.
getPort
());
}
@Test
public
void
testServiceLoadBalancerEmptyHostAndIP
()
{
mockRestServiceWithLB
(
""
,
""
);
final
Endpoint
resultEndpoint1
=
flinkKubeClient
.
getRestEndpoint
(
CLUSTER_ID
);
assertNull
(
resultEndpoint1
);
}
@Test
public
void
testServiceLoadBalancerNullHostAndIP
()
{
mockRestServiceWithLB
(
null
,
null
);
final
Endpoint
resultEndpoint2
=
flinkKubeClient
.
getRestEndpoint
(
CLUSTER_ID
);
assertNull
(
resultEndpoint2
);
}
@Test
public
void
testStopAndCleanupCluster
()
throws
Exception
{
this
.
flinkKubeClient
.
createJobManagerComponent
(
this
.
kubernetesJobManagerSpecification
);
final
KubernetesPod
kubernetesPod
=
new
KubernetesPod
(
new
PodBuilder
()
.
editOrNewMetadata
()
.
withName
(
"mock-task-manager-pod"
)
.
endMetadata
()
.
editOrNewSpec
()
.
endSpec
()
.
build
());
this
.
flinkKubeClient
.
createTaskManagerPod
(
kubernetesPod
);
assertEquals
(
1
,
this
.
kubeClient
.
apps
().
deployments
().
inNamespace
(
NAMESPACE
).
list
().
getItems
().
size
());
assertEquals
(
1
,
this
.
kubeClient
.
configMaps
().
inNamespace
(
NAMESPACE
).
list
().
getItems
().
size
());
assertEquals
(
2
,
this
.
kubeClient
.
services
().
inNamespace
(
NAMESPACE
).
list
().
getItems
().
size
());
assertEquals
(
1
,
this
.
kubeClient
.
pods
().
inNamespace
(
NAMESPACE
).
list
().
getItems
().
size
());
this
.
flinkKubeClient
.
stopAndCleanupCluster
(
CLUSTER_ID
);
assertTrue
(
this
.
kubeClient
.
apps
().
deployments
().
inNamespace
(
NAMESPACE
).
list
().
getItems
().
isEmpty
());
}
private
void
mockRestServiceWithLB
(
@Nullable
String
hostname
,
@Nullable
String
ip
)
{
final
String
restServiceName
=
KubernetesUtils
.
getRestServiceName
(
CLUSTER_ID
);
final
String
path
=
String
.
format
(
"/api/v1/namespaces/%s/services/%s"
,
NAMESPACE
,
restServiceName
);
server
.
expect
()
.
withPath
(
path
)
.
andReturn
(
200
,
buildMockRestServiceWithLB
(
hostname
,
ip
))
.
always
();
final
Service
resultService
=
kubeClient
.
services
()
.
inNamespace
(
NAMESPACE
)
.
withName
(
KubernetesUtils
.
getRestServiceName
(
CLUSTER_ID
))
.
get
();
assertNotNull
(
resultService
);
}
private
Service
buildMockRestServiceWithLB
(
@Nullable
String
hostname
,
@Nullable
String
ip
)
{
final
Service
service
=
new
ServiceBuilder
()
.
build
();
service
.
setStatus
(
new
ServiceStatusBuilder
()
.
withLoadBalancer
(
new
LoadBalancerStatus
(
Collections
.
singletonList
(
new
LoadBalancerIngress
(
hostname
,
ip
)))).
build
());
return
service
;
}
}
}
}
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactoryTest.java
浏览文件 @
f6ad9685
...
@@ -46,7 +46,7 @@ public class KubernetesTaskManagerFactoryTest extends KubernetesTaskManagerTestB
...
@@ -46,7 +46,7 @@ public class KubernetesTaskManagerFactoryTest extends KubernetesTaskManagerTestB
KubernetesTestUtils
.
createTemporyFile
(
"some data"
,
flinkConfDir
,
"log4j.properties"
);
KubernetesTestUtils
.
createTemporyFile
(
"some data"
,
flinkConfDir
,
"log4j.properties"
);
this
.
resultPod
=
this
.
resultPod
=
KubernetesTaskManagerFactory
.
build
TaskManagerComponent
(
kubernetesTaskManagerParameters
).
getInternalResource
();
KubernetesTaskManagerFactory
.
create
TaskManagerComponent
(
kubernetesTaskManagerParameters
).
getInternalResource
();
}
}
@Test
@Test
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录