提交 25d52ebd 编写于 作者: S Sanjeev Kulkarni 提交者: Sijie Guo

Have the ability to specify custom labels for k8 jobs (#2702)

### Motivation

This pr will allow functions being deployed as kubernetes jobs have installation defined labels.
上级 3bdd6ecf
......@@ -83,6 +83,7 @@ class KubernetesRuntime implements Runtime {
private InstanceControlGrpc.InstanceControlFutureStub[] stub;
private InstanceConfig instanceConfig;
private final String jobNamespace;
private final Map<String, String> customLabels;
private final String pulsarDockerImageName;
private final String pulsarRootDir;
private final String userCodePkgUrl;
......@@ -94,6 +95,7 @@ class KubernetesRuntime implements Runtime {
KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
String jobNamespace,
Map<String, String> customLabels,
String pulsarDockerImageName,
String pulsarRootDir,
InstanceConfig instanceConfig,
......@@ -109,6 +111,7 @@ class KubernetesRuntime implements Runtime {
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
this.jobNamespace = jobNamespace;
this.customLabels = customLabels;
this.pulsarDockerImageName = pulsarDockerImageName;
this.pulsarRootDir = pulsarRootDir;
this.userCodePkgUrl = userCodePkgUrl;
......@@ -450,6 +453,9 @@ class KubernetesRuntime implements Runtime {
labels.put("app", createJobName(functionDetails));
labels.put("namespace", functionDetails.getNamespace());
labels.put("tenant", functionDetails.getTenant());
if (customLabels != null && !customLabels.isEmpty()) {
labels.putAll(customLabels);
}
return labels;
}
......
......@@ -30,6 +30,8 @@ import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
/**
......@@ -43,6 +45,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private final String pulsarDockerImageName;
private final String pulsarRootDir;
private final Boolean submittingInsidePod;
private final Map<String, String> customLabels;
private final String pulsarAdminUri;
private final String pulsarServiceUri;
private final String stateStorageServiceUri;
......@@ -59,6 +62,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
String pulsarDockerImageName,
String pulsarRootDir,
Boolean submittingInsidePod,
Map<String, String> customLabels,
String pulsarServiceUri,
String pulsarAdminUri,
String stateStorageServiceUri,
......@@ -80,6 +84,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.pulsarRootDir = "/pulsar";
}
this.submittingInsidePod = submittingInsidePod;
this.customLabels = customLabels;
this.pulsarServiceUri = pulsarServiceUri;
this.pulsarAdminUri = pulsarAdminUri;
this.stateStorageServiceUri = stateStorageServiceUri;
......@@ -113,6 +118,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
appsClient,
coreClient,
jobNamespace,
customLabels,
pulsarDockerImageName,
pulsarRootDir,
instanceConfig,
......
......@@ -68,7 +68,7 @@ public class KubernetesRuntimeTest {
this.stateStorageServiceUrl = "bk://localhost:4181";
this.logDirectory = "logs/functions";
this.factory = new KubernetesRuntimeFactory(null, null, null, null,
false, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null);
false, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null);
}
@AfterMethod
......
......@@ -125,6 +125,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(),
workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
workerConfig.getKubernetesContainerFactory().getCustomLabels(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
workerConfig.getStateStorageServiceUrl(),
......
......@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
......@@ -138,6 +139,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private Boolean submittingInsidePod;
private String pulsarServiceUrl;
private String pulsarAdminUrl;
private Map<String, String> customLabels;
}
private KubernetesContainerFactory kubernetesContainerFactory;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册