提交 b50760ca 编写于 作者: B Boyang Jerry Peng 提交者: Matteo Merli

Implementing authentication for Pulsar Functions (#3735)

* Implementing authentication for Pulsar Functions

* delete unnecessary changes

* cleaning up

* improving implementation

* fixing tests

* cleaning up

* add no op implementation

* cleaning up unnecessary changes

* refactoring based on comments

* adding comments

* change data from string type to bytes

* add proto file

* addressing comments

* up merging

* refactoring get token code

* cleaning up

* fix bugs and add tests

* add tests

* remove service account creation

* cleanup unused imports

* add field for auth provider

* adding comments
上级 4fe7ab4f
......@@ -76,7 +76,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
return parseToken(token);
}
private String getToken(AuthenticationDataSource authData) throws AuthenticationException {
public static String getToken(AuthenticationDataSource authData) throws AuthenticationException {
if (authData.hasDataFromCommand()) {
// Authenticate Pulsar binary connection
return authData.getCommandData();
......@@ -96,7 +96,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
}
}
private String validateToken(final String token) throws AuthenticationException {
private static String validateToken(final String token) throws AuthenticationException {
if (StringUtils.isNotBlank(token)) {
return token;
} else {
......
......@@ -82,7 +82,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("functionConfig") String functionConfigJson) {
functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, null, functionConfigJson, clientAppId());
functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
}
@PUT
......
......@@ -79,7 +79,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, null, sinkConfigJson, clientAppId());
functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
}
@PUT
......
......@@ -79,7 +79,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @FormDataParam("sourceConfig") String sourceConfigJson) {
source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
functionPkgUrl, null, sourceConfigJson, clientAppId());
functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
}
@PUT
......
......@@ -23,6 +23,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
/**
......@@ -40,6 +41,7 @@ public class InstanceConfig {
private String functionVersion;
private FunctionDetails functionDetails;
private int maxBufferedTuples;
private Function.FunctionAuthenticationSpec functionAuthenticationSpec;
private int port;
private String clusterName;
......
......@@ -141,6 +141,21 @@ message FunctionMetaData {
uint64 version = 3;
uint64 createTime = 4;
map<int32, FunctionState> instanceStates = 5;
FunctionAuthenticationSpec functionAuthSpec = 6;
}
message FunctionAuthenticationSpec {
/**
* function authentication related data that the function authentication provider
* needs to cache/distribute to all workers support function authentication.
* Depending on the function authentication provider implementation, this can be the actual auth credentials
* or a pointer to the auth credentials that this function should use
*/
bytes data = 1;
/**
* classname of the function auth provicer this data is relevant to
*/
string provider = 2;
}
message Instance {
......
......@@ -63,6 +63,12 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import java.util.Optional;
import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken;
public class ClearTextFunctionTokenAuthProvider implements FunctionAuthProvider {
@Override
public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
authConfig.setClientAuthenticationParameters("token:" + new String(functionAuthData.getData()));
}
@Override
public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception {
String token = null;
try {
token = getToken(authenticationDataSource);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (token != null) {
return Optional.of(FunctionAuthData.builder().data(token.getBytes()).build());
}
return null;
}
@Override
public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
//no-op
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
/**
* A wrapper for authentication data for functions
*/
public class FunctionAuthData {
/**
* function authentication related data that the function authentication provider
* needs to cache/distribute to all workers support function authentication.
* Depending on the function authentication provider implementation, this can be the actual auth credentials
* or a pointer to the auth credentials that this function should use
*/
private byte[] data;
/**
* classname of the function auth provicer this data is relevant to
*/
private String provider;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import java.util.Optional;
/**
* This is a generic interface that functions can use to cache and distribute appropriate authentication
* data that is needed to configure the runtime of functions to support appropriate authentication of function instances
*/
public interface FunctionAuthProvider {
/**
* Set authentication configs for function instance based on the data in FunctionAuthenticationSpec
* @param authConfig authentication configs passed to the function instance
* @param functionAuthData function authentication data that is provider specific
*/
void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData);
/**
* Cache auth data in as part of function metadata for function that runtime may need to configure authentication
* @param tenant tenant that the function is running under
* @param namespace namespace that is the function is running under
* @param name name of the function
* @param authenticationDataSource auth data
* @return
* @throws Exception
*/
Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception;
/**
* Clean up operation for auth when function is terminated
* @param tenant tenant that the function is running under
* @param namespace namespace that is the function is running under
* @param name name of the function
* @param functionAuthData function auth data
* @throws Exception
*/
void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import org.apache.pulsar.functions.proto.Function;
public final class FunctionAuthUtils {
public static final FunctionAuthData getFunctionAuthData(Function.FunctionAuthenticationSpec functionAuthenticationSpec) {
return FunctionAuthData.builder().data(functionAuthenticationSpec.getData().toByteArray()).build();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import io.kubernetes.client.models.V1ServiceAccount;
import io.kubernetes.client.models.V1StatefulSet;
/**
* Kubernetes runtime specific functions authentication provider
*/
public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider {
/**
* Configure function statefulset spec based on function auth data
* @param statefulSet statefulset spec for function
* @param functionAuthData function auth data
*/
void configureAuthDataStatefulSet(V1StatefulSet statefulSet, FunctionAuthData functionAuthData);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1DeleteOptions;
import io.kubernetes.client.models.V1ObjectMeta;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1Secret;
import io.kubernetes.client.models.V1SecretVolumeSource;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1Volume;
import io.kubernetes.client.models.V1VolumeMount;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken;
@Slf4j
public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAuthProvider {
private static final int NUM_RETRIES = 5;
private static final long SLEEP_BETWEEN_RETRIES_MS = 500;
private static final String SECRET_NAME = "function-auth";
private static final String DEFAULT_SECRET_MOUNT_DIR = "/etc/auth";
private static final String FUNCTION_AUTH_TOKEN = "token";
private final CoreV1Api coreClient;
private final String kubeNamespace;
public KubernetesSecretsTokenAuthProvider(CoreV1Api coreClient, String kubeNamespace) {
this.coreClient = coreClient;
this.kubeNamespace = kubeNamespace;
}
@Override
public void configureAuthDataStatefulSet(V1StatefulSet statefulSet, FunctionAuthData functionAuthData) {
V1PodSpec podSpec = statefulSet.getSpec().getTemplate().getSpec();
// configure pod mount secret with auth token
podSpec.setVolumes(Collections.singletonList(
new V1Volume()
.name(SECRET_NAME)
.secret(
new V1SecretVolumeSource()
.secretName(getSecretName(new String(functionAuthData.getData())))
.defaultMode(256))));
podSpec.getContainers().forEach(container -> container.setVolumeMounts(Collections.singletonList(
new V1VolumeMount()
.name(SECRET_NAME)
.mountPath(DEFAULT_SECRET_MOUNT_DIR)
.readOnly(true))));
}
@Override
public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
authConfig.setClientAuthenticationParameters(String.format("file://%s/%s", DEFAULT_SECRET_MOUNT_DIR, FUNCTION_AUTH_TOKEN));
}
@Override
public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name,
AuthenticationDataSource authenticationDataSource) {
String id = null;
try {
String token = getToken(authenticationDataSource);
if (token != null) {
id = createSecret(token, tenant, namespace, name);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
if (id != null) {
return Optional.of(FunctionAuthData.builder().data(id.getBytes()).build());
}
return Optional.empty();
}
@Override
public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name);
String secretName = new String(functionAuthData.getData());
Actions.Action deleteSecrets = Actions.Action.builder()
.actionName(String.format("Deleting secrets for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
try {
V1DeleteOptions v1DeleteOptions = new V1DeleteOptions();
v1DeleteOptions.setGracePeriodSeconds(0L);
v1DeleteOptions.setPropagationPolicy("Foreground");
coreClient.deleteNamespacedSecret(secretName,
kubeNamespace, v1DeleteOptions, "true",
null, null, null);
} catch (ApiException e) {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
log.warn("Secrets for function {} does not exist", fqfn);
return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
return Actions.ActionResult.builder().success(true).build();
})
.build();
Actions.Action waitForSecretsDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for secrets for function %s to complete deletion", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
try {
coreClient.readNamespacedSecret(secretName, kubeNamespace,
null, null, null);
} catch (ApiException e) {
// statefulset is gone
if (e.getCode() == HTTP_NOT_FOUND) {
return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
return Actions.ActionResult.builder()
.success(false)
.build();
})
.build();
AtomicBoolean success = new AtomicBoolean(false);
Actions.newBuilder()
.addAction(deleteSecrets.toBuilder()
.continueOn(true)
.build())
.addAction(waitForSecretsDeletion.toBuilder()
.continueOn(false)
.onSuccess(ignore -> success.set(true))
.build())
.addAction(deleteSecrets.toBuilder()
.continueOn(true)
.build())
.addAction(waitForSecretsDeletion.toBuilder()
.onSuccess(ignore -> success.set(true))
.build())
.run();
if (!success.get()) {
throw new RuntimeException(String.format("Failed to delete secrets for function %s", fqfn));
}
}
private String createSecret(String token, String tenant, String namespace, String name) throws ApiException, InterruptedException {
StringBuilder sb = new StringBuilder();
Actions.Action createAuthSecret = Actions.Action.builder()
.actionName(String.format("Creating authentication secret for function %s/%s/%s", tenant, namespace, name))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
String id = RandomStringUtils.random(5, true, true).toLowerCase();
V1Secret v1Secret = new V1Secret()
.metadata(new V1ObjectMeta().name(getSecretName(id)))
.data(Collections.singletonMap(FUNCTION_AUTH_TOKEN, token.getBytes()));
try {
coreClient.createNamespacedSecret(kubeNamespace, v1Secret, "true");
} catch (ApiException e) {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
return Actions.ActionResult.builder()
.errorMsg(String.format("Secret %s already present", id))
.success(false)
.build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
sb.append(id.toCharArray());
return Actions.ActionResult.builder().success(true).build();
})
.build();
AtomicBoolean success = new AtomicBoolean(false);
Actions.newBuilder()
.addAction(createAuthSecret.toBuilder()
.onSuccess(ignore -> success.set(true))
.build())
.run();
if (!success.get()) {
throw new RuntimeException(String.format("Failed to create authentication secret for function %s/%s/%s", tenant, namespace, name));
}
return sb.toString();
}
private String getSecretName(String id) {
return "pf-secret-" + id;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import java.util.Optional;
public class NoOpFunctionAuthProvider implements FunctionAuthProvider{
@Override
public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
}
@Override
public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name,
AuthenticationDataSource authenticationDataSource)
throws Exception {
return Optional.empty();
}
@Override
public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
}
}
......@@ -52,6 +52,7 @@ import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.models.V1Toleration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
......@@ -80,6 +81,8 @@ import java.util.regex.Pattern;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
/**
* Kubernetes based runtime for running functions.
......@@ -94,9 +97,6 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
@VisibleForTesting
public class KubernetesRuntime implements Runtime {
private static int NUM_RETRIES = 5;
private static long SLEEP_BETWEEN_RETRIES_MS = 500;
private static final String ENV_SHARD_ID = "SHARD_ID";
private static final int maxJobNameSize = 55;
private static final Integer GRPC_PORT = 9093;
......@@ -134,7 +134,8 @@ public class KubernetesRuntime implements Runtime {
private final String pulsarAdminUrl;
private final SecretsProviderConfigurator secretsProviderConfigurator;
private int percentMemoryPadding;
private final KubernetesFunctionAuthProvider functionAuthDataCacheProvider;
private final AuthenticationConfig authConfig;
KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
......@@ -158,7 +159,8 @@ public class KubernetesRuntime implements Runtime {
AuthenticationConfig authConfig,
SecretsProviderConfigurator secretsProviderConfigurator,
Integer expectedMetricsCollectionInterval,
int percentMemoryPadding) throws Exception {
int percentMemoryPadding,
KubernetesFunctionAuthProvider functionAuthDataCacheProvider) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
......@@ -187,11 +189,16 @@ public class KubernetesRuntime implements Runtime {
break;
}
this.authConfig = authConfig;
this.functionAuthDataCacheProvider = functionAuthDataCacheProvider;
this.processArgs = new LinkedList<>();
this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir));
// use exec to to launch function so that it gets launched in the foreground with the same PID as shell
// so that when we kill the pod, the signal will get propagated to the function code
this.processArgs.add("exec");
this.processArgs.addAll(
RuntimeUtils.getCmd(
instanceConfig,
......@@ -221,16 +228,20 @@ public class KubernetesRuntime implements Runtime {
*/
@Override
public void start() throws Exception {
submitService();
try {
submitService();
submitStatefulSet();
} catch (Exception e) {
log.error("Could not submit statefulset for {}/{}/{}, deleting service as well",
log.error("Failed start function {}/{}/{} in Kubernetes",
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(), e);
deleteService();
stop();
throw e;
}
if (channel == null && stub == null) {
channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
......@@ -363,8 +374,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action createService = Actions.Action.builder()
.actionName(String.format("Submitting service for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
final V1Service response;
try {
......@@ -428,6 +439,11 @@ public class KubernetesRuntime implements Runtime {
private void submitStatefulSet() throws Exception {
final V1StatefulSet statefulSet = createStatefulSet();
// Configure function authentication if needed
if (instanceConfig.getFunctionAuthenticationSpec() != null) {
functionAuthDataCacheProvider.configureAuthDataStatefulSet(
statefulSet, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
}
log.info("Submitting the following spec to k8 {}", appsClient.getApiClient().getJSON().serialize(statefulSet));
......@@ -435,8 +451,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action createStatefulSet = Actions.Action.builder()
.actionName(String.format("Submitting statefulset for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
final V1StatefulSet response;
try {
......@@ -482,8 +498,8 @@ public class KubernetesRuntime implements Runtime {
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
Actions.Action deleteStatefulSet = Actions.Action.builder()
.actionName(String.format("Deleting statefulset for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
Response response;
try {
......@@ -531,8 +547,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action waitForStatefulSetDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
// set retry period to be about 2x the graceshutdown time
.numRetries(NUM_RETRIES * 2)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS* 2)
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES * 2)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS* 2)
.supplier(() -> {
V1StatefulSet response;
try {
......@@ -560,8 +576,8 @@ public class KubernetesRuntime implements Runtime {
// Need to wait for all pods to die so we can cleanup subscriptions.
Actions.Action waitForStatefulPodsToTerminate = Actions.Action.builder()
.actionName(String.format("Waiting for pods for function %s to terminate", fqfn))
.numRetries(NUM_RETRIES * 2)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS * 2)
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES * 2)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS * 2)
.supplier(() -> {
String labels = String.format("tenant=%s,namespace=%s,name=%s",
instanceConfig.getFunctionDetails().getTenant(),
......@@ -633,8 +649,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action deleteService = Actions.Action.builder()
.actionName(String.format("Deleting service for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
final Response response;
try {
......@@ -679,8 +695,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action waitForServiceDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
V1Service response;
try {
......@@ -738,6 +754,28 @@ public class KubernetesRuntime implements Runtime {
}
private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
// add auth plugin and parameters if necessary
if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())) {
return Arrays.asList(
pulsarRootDir + "/bin/pulsar-admin",
"--auth-plugin",
authConfig.getClientAuthenticationPlugin(),
"--auth-params",
authConfig.getClientAuthenticationParameters(),
"--admin-url",
pulsarAdminUrl,
"functions",
"download",
"--path",
bkPath,
"--destination-file",
userCodeFilePath);
}
}
return Arrays.asList(
pulsarRootDir + "/bin/pulsar-admin",
"--admin-url",
......@@ -795,6 +833,7 @@ public class KubernetesRuntime implements Runtime {
statefulSet.spec(statefulSetSpec);
return statefulSet;
}
......
......@@ -27,12 +27,15 @@ import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.util.Config;
import java.nio.file.Paths;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
......@@ -44,6 +47,7 @@ import java.util.Timer;
import java.util.TimerTask;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
/**
* Kubernetes based function container factory implementation.
......@@ -51,6 +55,9 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
@Slf4j
public class KubernetesRuntimeFactory implements RuntimeFactory {
static int NUM_RETRIES = 5;
static long SLEEP_BETWEEN_RETRIES_MS = 500;
@Getter
@Setter
@NoArgsConstructor
......@@ -158,6 +165,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.functionInstanceMinResources = functionInstanceMinResources;
try {
setupClient();
} catch (Exception e) {
log.error("Failed to setup client", e);
throw new RuntimeException(e);
}
}
@Override
......@@ -169,7 +182,6 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl,
String originalCodeFileName,
Long expectedHealthCheckInterval) throws Exception {
setupClient();
String instanceFile;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
......@@ -181,6 +193,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
default:
throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
}
// adjust the auth config to support auth
if (instanceConfig.getFunctionAuthenticationSpec() != null) {
getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
}
return new KubernetesRuntime(
appsClient,
coreClient,
......@@ -204,7 +222,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
authConfig,
secretsProviderConfigurator,
expectedMetricsCollectionInterval,
this.kubernetesInfo.getPercentMemoryPadding());
this.kubernetesInfo.getPercentMemoryPadding(),
getAuthProvider());
}
@Override
......@@ -215,16 +234,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
KubernetesRuntime.doChecks(functionDetails);
validateMinResourcesRequired(functionDetails);
try {
setupClient();
} catch (Exception e) {
throw new RuntimeException(e);
}
secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, kubernetesInfo.getJobNamespace(), functionDetails);
}
@VisibleForTesting
void setupClient() throws Exception {
public void setupClient() throws Exception {
if (appsClient == null) {
if (this.kubernetesInfo.getK8Uri() == null) {
log.info("k8Uri is null thus going by defaults");
......@@ -309,4 +323,9 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
}
}
}
@Override
public KubernetesFunctionAuthProvider getAuthProvider() {
return new KubernetesSecretsTokenAuthProvider(coreClient, kubernetesInfo.jobNamespace);
}
}
......@@ -29,6 +29,8 @@ import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import java.nio.file.Paths;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
/**
* Thread based function container factory implementation.
*/
......@@ -125,6 +127,12 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
default:
throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
}
// configure auth if necessary
if (instanceConfig.getFunctionAuthenticationSpec() != null) {
getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
}
return new ProcessRuntime(
instanceConfig,
instanceFile,
......
......@@ -35,6 +35,10 @@ public interface Runtime {
void stop() throws Exception;
default void terminate() throws Exception {
stop();
}
boolean isAlive();
Throwable getDeathException();
......
......@@ -19,6 +19,8 @@
package org.apache.pulsar.functions.runtime;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.auth.NoOpFunctionAuthProvider;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
......@@ -43,7 +45,12 @@ public interface RuntimeFactory extends AutoCloseable {
default void doAdmissionChecks(Function.FunctionDetails functionDetails) { }
default FunctionAuthProvider getAuthProvider() throws IllegalAccessException, InstantiationException {
return NoOpFunctionAuthProvider.class.newInstance();
}
@Override
void close();
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Optional;
public class ClearTextFunctionTokenAuthProviderTest {
@Test
public void testClearTextAuth() throws Exception {
ClearTextFunctionTokenAuthProvider clearTextFunctionTokenAuthProvider = new ClearTextFunctionTokenAuthProvider();
Optional<FunctionAuthData> functionAuthData = clearTextFunctionTokenAuthProvider.cacheAuthData("test-tenant",
"test-ns", "test-func", new AuthenticationDataSource() {
@Override
public boolean hasDataFromCommand() {
return true;
}
@Override
public String getCommandData() {
return "test-token";
}
});
Assert.assertTrue(functionAuthData.isPresent());
Assert.assertEquals(functionAuthData.get().getData(), "test-token".getBytes());
AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
clearTextFunctionTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, functionAuthData.get());
Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), AuthenticationToken.class.getName());
Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), "token:test-token");
AuthenticationToken authenticationToken = new AuthenticationToken();
authenticationToken.configure(authenticationConfig.getClientAuthenticationParameters());
Assert.assertEquals(authenticationToken.getAuthData().getCommandData(), "test-token");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.auth;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1Container;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1PodTemplateSpec;
import io.kubernetes.client.models.V1Secret;
import io.kubernetes.client.models.V1ServiceAccount;
import io.kubernetes.client.models.V1StatefulSet;
import io.kubernetes.client.models.V1StatefulSetSpec;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.Optional;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class KubernetesSecretsTokenAuthProviderTest {
@Test
public void testConfigureAuthDataStatefulSet() {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
V1StatefulSet statefulSet = new V1StatefulSet();
statefulSet.setSpec(
new V1StatefulSetSpec().template(
new V1PodTemplateSpec().spec(
new V1PodSpec().containers(
Collections.singletonList(new V1Container())))));
FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
kubernetesSecretsTokenAuthProvider.configureAuthDataStatefulSet(statefulSet, functionAuthData);
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().get(0).getName(), "function-auth");
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().get(0).getSecret().getSecretName(), "pf-secret-foo");
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().size(), 1);
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().size(), 1);
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(), "function-auth");
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(), "/etc/auth");
}
@Test
public void testCacheAuthData() throws ApiException {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString());
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
Optional<FunctionAuthData> functionAuthData = kubernetesSecretsTokenAuthProvider.cacheAuthData("test-tenant",
"test-ns", "test-func", new AuthenticationDataSource() {
@Override
public boolean hasDataFromCommand() {
return true;
}
@Override
public String getCommandData() {
return "test-token";
}
});
Assert.assertTrue(functionAuthData.isPresent());
Assert.assertTrue(StringUtils.isNotBlank(new String(functionAuthData.get().getData())));
}
@Test
public void configureAuthenticationConfig() {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
kubernetesSecretsTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, functionAuthData);
Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), AuthenticationToken.class.getName());
Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), "file:///etc/auth/token");
}
}
......@@ -165,8 +165,6 @@ public class KubernetesRuntimeFactoryTest {
factory = createKubernetesRuntimeFactory(null, null);
FunctionDetails functionDetails = createFunctionDetails();
factory.doAdmissionChecks(functionDetails);
verify(factory, times(1)).setupClient();
}
@Test
......
......@@ -68,6 +68,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.functions.Utils.FILE;
import static org.apache.pulsar.common.functions.Utils.HTTP;
import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.Utils.getSinkType;
import static org.apache.pulsar.functions.utils.Utils.getSourceType;
......@@ -154,6 +155,7 @@ public class FunctionActioner {
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
InstanceConfig instanceConfig = createInstanceConfig(functionDetailsBuilder.build(),
instance.getFunctionMetaData().getFunctionAuthSpec(),
instanceId, workerConfig.getPulsarFunctionsCluster());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
......@@ -164,7 +166,8 @@ public class FunctionActioner {
}
InstanceConfig createInstanceConfig(FunctionDetails functionDetails, int instanceId, String clusterName) {
InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.FunctionAuthenticationSpec
functionAuthSpec, int instanceId, String clusterName) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
// TODO: set correct function id and version when features implemented
......@@ -174,6 +177,7 @@ public class FunctionActioner {
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
instanceConfig.setClusterName(clusterName);
instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
return instanceConfig;
}
......@@ -231,17 +235,9 @@ public class FunctionActioner {
}
}
public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
private void cleanupFunctionFiles(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
FunctionDetails details = functionMetaData.getFunctionDetails();
log.info("{}/{}/{}-{} Stopping function...", details.getTenant(), details.getNamespace(), details.getName(),
instance.getInstanceId());
if (functionRuntimeInfo.getRuntimeSpawner() != null) {
functionRuntimeInfo.getRuntimeSpawner().close();
functionRuntimeInfo.setRuntimeSpawner(null);
}
// clean up function package
File pkgDir = new File(
workerConfig.getDownloadDirectory(),
......@@ -250,7 +246,7 @@ public class FunctionActioner {
if (pkgDir.exists()) {
try {
MoreFiles.deleteRecursively(
Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
} catch (IOException e) {
log.warn("Failed to delete package for function: {}",
FunctionDetailsUtils.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
......@@ -258,14 +254,42 @@ public class FunctionActioner {
}
}
public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
FunctionDetails details = functionMetaData.getFunctionDetails();
log.info("{}/{}/{}-{} Stopping function...", details.getTenant(), details.getNamespace(), details.getName(),
instance.getInstanceId());
if (functionRuntimeInfo.getRuntimeSpawner() != null) {
functionRuntimeInfo.getRuntimeSpawner().close();
functionRuntimeInfo.setRuntimeSpawner(null);
}
cleanupFunctionFiles(functionRuntimeInfo);
}
public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.getFunctionDetails();
log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), details.getNamespace(), details.getName(),
functionRuntimeInfo.getFunctionInstance().getInstanceId());
FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details);
log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
if (functionRuntimeInfo.getRuntimeSpawner() != null) {
functionRuntimeInfo.getRuntimeSpawner().close();
// cleanup any auth data cached
try {
functionRuntimeInfo.getRuntimeSpawner()
.getRuntimeFactory().getAuthProvider()
.cleanUpAuthData(
details.getTenant(), details.getNamespace(), details.getName(),
getFunctionAuthData(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionAuthSpec()));
} catch (Exception e) {
log.error("Failed to cleanup auth data for function: {}", fqfn, e);
}
functionRuntimeInfo.setRuntimeSpawner(null);
}
cleanupFunctionFiles(functionRuntimeInfo);
stopFunction(functionRuntimeInfo);
//cleanup subscriptions
if (details.getSource().getCleanupSubscription()) {
Map<String, Function.ConsumerSpec> consumerSpecMap = details.getSource().getInputSpecsMap();
......@@ -309,8 +333,8 @@ public class FunctionActioner {
SubscriptionStats sub = stats.subscriptions.get(InstanceUtils.getDefaultSubscriptionName(details));
if (sub != null) {
existingConsumers = sub.consumers.stream()
.map(consumerStats -> consumerStats.metadata)
.collect(Collectors.toList());
.map(consumerStats -> consumerStats.metadata)
.collect(Collectors.toList());
}
} catch (PulsarAdminException e1) {
......
......@@ -24,6 +24,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerService;
......@@ -52,4 +53,8 @@ public class FunctionApiResource implements Supplier<WorkerService> {
? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
: null;
}
public AuthenticationDataHttps clientAuthData() {
return (AuthenticationDataHttps) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
}
}
......@@ -26,6 +26,7 @@ import static org.apache.pulsar.functions.utils.Reflections.createInstance;
import com.google.gson.Gson;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
......@@ -43,6 +44,7 @@ import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
......@@ -50,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
......@@ -77,6 +78,7 @@ import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
......@@ -93,6 +95,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
......@@ -292,7 +295,8 @@ public abstract class ComponentImpl {
final String functionPkgUrl,
final String functionDetailsJson,
final String componentConfigJson,
final String clientRole) {
final String clientRole,
AuthenticationDataHttps clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
......@@ -377,8 +381,30 @@ public abstract class ComponentImpl {
// function state
FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
.setFunctionDetails(functionDetails)
.setCreateTime(System.currentTimeMillis())
.setVersion(0);
// cache auth if need
if (clientAuthenticationDataHttps != null) {
try {
Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
.getRuntimeFactory()
.getAuthProvider()
.cacheAuthData(tenant, namespace, componentName, clientAuthenticationDataHttps);
if (functionAuthData.isPresent()) {
functionMetaDataBuilder.setFunctionAuthSpec(
Function.FunctionAuthenticationSpec.newBuilder()
.setData(ByteString.copyFrom(functionAuthData.get().getData()))
.build());
}
} catch (Exception e) {
log.error("Error caching authentication data for {} {}/{}/{}", componentType, tenant, namespace, componentName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", componentType, componentName, e.getMessage()));
}
}
PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
try {
......@@ -494,6 +520,7 @@ public abstract class ComponentImpl {
String existingComponentConfigJson;
FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
if (componentType.equals(FUNCTION)) {
FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
......@@ -573,9 +600,9 @@ public abstract class ComponentImpl {
throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()));
}
// function state
FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
.setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
// merge from existing metadata
FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder().mergeFrom(existingComponent)
.setFunctionDetails(functionDetails);
PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
if (isNotBlank(functionPkgUrl) || uploadedInputStreamAsFile != null) {
......@@ -590,6 +617,7 @@ public abstract class ComponentImpl {
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
updateRequest(functionMetaDataBuilder.build());
}
......
......@@ -90,7 +90,7 @@ public class FunctionsImplV2 {
uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String
functionDetailsJson, String functionConfigJson, String clientAppId) {
delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId, null);
return Response.ok().build();
}
......
......@@ -69,7 +69,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @FormDataParam("functionConfig") String functionConfigJson) {
functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, null, functionConfigJson, clientAppId());
functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
}
......
......@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
......@@ -34,7 +33,6 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
......@@ -62,7 +60,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
functionPkgUrl, null, sinkConfigJson, clientAppId());
functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
}
@PUT
......
......@@ -60,7 +60,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
final @FormDataParam("sourceConfig") String sourceConfigJson) {
source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
functionPkgUrl, null, sourceConfigJson, clientAppId());
functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
}
......
......@@ -48,7 +48,14 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@Slf4j
public class FunctionRuntimeManagerTest {
......@@ -663,7 +670,9 @@ public class FunctionRuntimeManagerTest {
public void testExternallyManagedRuntimeUpdate() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setKubernetesContainerFactory(new WorkerConfig.KubernetesContainerFactory());
workerConfig.setKubernetesContainerFactory(
new WorkerConfig.KubernetesContainerFactory()
.setSubmittingInsidePod(false));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setPulsarFunctionsCluster("cluster");
......@@ -679,8 +688,8 @@ public class FunctionRuntimeManagerTest {
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
KubernetesRuntimeFactory kubernetesRuntimeFactory = mock(KubernetesRuntimeFactory.class);
doNothing().when(kubernetesRuntimeFactory).setupClient();
doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();
doReturn(mock(KubernetesRuntime.class)).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());
......
......@@ -487,7 +487,7 @@ public class FunctionApiV3ResourceTest {
functionPkgUrl,
null,
new Gson().toJson(functionConfig),
null);
null, null);
}
......@@ -502,7 +502,7 @@ public class FunctionApiV3ResourceTest {
null,
null,
new Gson().toJson(functionConfig),
null);
null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function already exists")
......@@ -1446,7 +1446,7 @@ public class FunctionApiV3ResourceTest {
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
null, new Gson().toJson(functionConfig), null);
null, new Gson().toJson(functionConfig), null, null);
}
......@@ -1478,7 +1478,7 @@ public class FunctionApiV3ResourceTest {
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
null, new Gson().toJson(functionConfig), null);
null, new Gson().toJson(functionConfig), null, null);
}
public static FunctionConfig createDefaultFunctionConfig() {
......
......@@ -420,7 +420,7 @@ public class SinkApiV3ResourceTest {
pkgUrl,
null,
new Gson().toJson(sinkConfig),
null);
null, null);
}
......@@ -435,7 +435,7 @@ public class SinkApiV3ResourceTest {
null,
null,
new Gson().toJson(sinkConfig),
null);
null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink already exists")
......@@ -529,7 +529,7 @@ public class SinkApiV3ResourceTest {
null,
null,
new Gson().toJson(sinkConfig),
null);
null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
......
......@@ -389,7 +389,7 @@ public class SourceApiV3ResourceTest {
pkgUrl,
null,
new Gson().toJson(sourceConfig),
null);
null, null);
}
......@@ -404,7 +404,7 @@ public class SourceApiV3ResourceTest {
null,
null,
new Gson().toJson(sourceConfig),
null);
null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source test-source already exists")
......@@ -498,7 +498,7 @@ public class SourceApiV3ResourceTest {
null,
null,
new Gson().toJson(sourceConfig),
null);
null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册