提交 52af2020 编写于 作者: B Boyang Jerry Peng 提交者: Jia Zhai

Use classloaders to load Java functions (#4685)

* Use classloading to load use code for functions
(cherry picked from commit 6ff1bbae)
上级 d71ea865
......@@ -283,6 +283,8 @@ OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=*"
......
......@@ -156,12 +156,6 @@
<version>${project.version}</version>
<!-- make sure the api examples are compiled before assembly -->
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- local-runner -->
......
......@@ -131,6 +131,8 @@
<exclude>io.netty:netty-transport-native-epoll</exclude>
<exclude>io.netty:netty-transport-native-unix-common</exclude>
<exclude>org.apache.pulsar:pulsar-functions-runtime-all</exclude>
<!-- Already included in pulsar-zookeeper instrumented jar -->
<exclude>org.apache.zookeeper:zookeeper</exclude>
......
......@@ -50,7 +50,15 @@ class ReflectionUtils {
@SuppressWarnings("unchecked")
static <T> Class<T> newClassInstance(String className) {
try {
return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className);
try {
// when the API is loaded in the same classloader as the impl
return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className);
} catch (Exception e) {
// when the API is loaded in a separate classloader as the impl
// the classloader that loaded the impl needs to be a child classloader of the classloader
// that loaded the API
return (Class<T>) Thread.currentThread().getContextClassLoader().loadClass(className);
}
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
......
......@@ -90,8 +90,6 @@ import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST
@Slf4j
public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// The class loader that used for loading functions
private ClassLoader fnClassLoader;
private final InstanceConfig instanceConfig;
private final FunctionCacheManager fnCache;
private final String jarFile;
......@@ -132,6 +130,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final Map<String, String> properties;
private final ClassLoader instanceClassLoader;
private ClassLoader functionClassLoader;
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
......@@ -166,12 +167,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// metrics collection especially in threaded mode
// In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
this.collectorRegistry = collectorRegistry;
this.instanceClassLoader = Thread.currentThread().getContextClassLoader();
}
/**
* NOTE: this method should be called in the instance thread, in order to make class loading work.
*/
JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
JavaInstance setupJavaInstance() throws Exception {
// initialize the thread context
ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
......@@ -181,18 +184,21 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
// start the function thread
loadJars();
functionClassLoader = loadJars();
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
Object object = Reflections.createInstance(
instanceConfig.getFunctionDetails().getClassName(),
clsLoader);
functionClassLoader);
if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
throw new RuntimeException("User class must either be Function or java.util.Function");
}
// start the state table
setupStateTable();
ContextImpl contextImpl = setupContext();
// start the output producer
setupOutput(contextImpl);
// start the input consumer
......@@ -225,8 +231,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.instanceCache.getScheduledExecutorService(),
this.componentType);
ContextImpl contextImpl = setupContext();
javaInstance = setupJavaInstance(contextImpl);
javaInstance = setupJavaInstance();
if (null != stateTable) {
StateContextImpl stateContext = new StateContextImpl(stateTable);
javaInstance.getContext().setStateContext(stateContext);
......@@ -254,7 +259,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
stats.processTimeStart();
// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
Thread.currentThread().setContextClassLoader(instanceClassLoader);
// register end time
stats.processTimeEnd();
......@@ -289,7 +296,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
private void loadJars() throws Exception {
private ClassLoader loadJars() throws Exception {
ClassLoader fnClassLoader;
try {
log.info("Load JAR: {}", jarFile);
// Let's first try to treat it as a nar archive
......@@ -309,13 +317,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
log.info("Initialize function class loader for function {} at function cache manager",
instanceConfig.getFunctionDetails().getName());
this.fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
throw new Exception("No function class loader available.");
}
// make sure the function class loader is accessible thread-locally
Thread.currentThread().setContextClassLoader(fnClassLoader);
return fnClassLoader;
}
private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
......@@ -425,23 +432,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
private void sendOutputMessage(Record srcRecord, Object output) {
if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
throw new RuntimeException(e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
}
private Record readInput() {
Record record;
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
record = this.source.read();
} catch (Exception e) {
stats.incrSourceExceptions(e);
log.info("Encountered exception in source read: ", e);
throw new RuntimeException(e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
// check record is valid
......@@ -466,19 +483,29 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
if (source != null) {
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
source.close();
} catch (Throwable e) {
log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
source = null;
}
if (sink != null) {
if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
sink.close();
} catch (Throwable e) {
log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
sink = null;
}
......@@ -667,11 +694,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
object = new PulsarSource(this.client, pulsarSourceConfig, this.properties);
object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
Thread.currentThread().getContextClassLoader());
this.functionClassLoader);
}
Class<?>[] typeArgs;
......@@ -683,11 +710,22 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
this.source = (Source<?>) object;
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
new TypeToken<Map<String, Object>>(){}.getType()), contextImpl);
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(this.functionClassLoader);
}
try {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
new TypeToken<Map<String, Object>>() {
}.getType()), contextImpl);
}
} catch (Exception e) {
log.error("Source open produced uncaught exception: ", e);
throw e;
} finally {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
......@@ -713,12 +751,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats);
object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
}
} else {
object = Reflections.createInstance(
sinkSpec.getClassName(),
Thread.currentThread().getContextClassLoader());
this.functionClassLoader);
}
if (object instanceof Sink) {
......@@ -726,11 +764,23 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
} else {
throw new RuntimeException("Sink does not implement correct interface");
}
if (sinkSpec.getConfigs().isEmpty()) {
this.sink.open(new HashMap<>(), contextImpl);
} else {
this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
new TypeToken<Map<String, Object>>() {}.getType()), contextImpl);
if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(this.functionClassLoader);
}
try {
if (sinkSpec.getConfigs().isEmpty()) {
this.sink.open(new HashMap<>(), contextImpl);
} else {
this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
new TypeToken<Map<String, Object>>() {
}.getType()), contextImpl);
}
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
throw e;
} finally {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
}
......@@ -58,6 +58,7 @@ public class PulsarSink<T> implements Sink<T> {
private final PulsarClient client;
private final PulsarSinkConfig pulsarSinkConfig;
private final Map<String, String> properties;
private final ClassLoader functionClassLoader;
private ComponentStatsManager stats;
@VisibleForTesting
......@@ -237,12 +238,14 @@ public class PulsarSink<T> implements Sink<T> {
}
}
public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties, ComponentStatsManager stats) {
public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties,
ComponentStatsManager stats, ClassLoader functionClassLoader) {
this.client = client;
this.pulsarSinkConfig = pulsarSinkConfig;
this.topicSchema = new TopicSchema(client);
this.properties = properties;
this.stats = stats;
this.functionClassLoader = functionClassLoader;
}
@Override
......@@ -314,9 +317,7 @@ public class PulsarSink<T> implements Sink<T> {
return (Schema<T>) Schema.BYTES;
}
Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(),
Thread.currentThread().getContextClassLoader());
Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), functionClassLoader);
if (Void.class.equals(typeArg)) {
// return type is 'void', so there's no schema to check
return null;
......
......@@ -45,15 +45,18 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
private final PulsarClient pulsarClient;
private final PulsarSourceConfig pulsarSourceConfig;
private final Map<String, String> properties;
private final ClassLoader functionClassLoader;
private List<String> inputTopics;
private List<Consumer<T>> inputConsumers = Collections.emptyList();
private final TopicSchema topicSchema;
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties,
ClassLoader functionClassLoader) {
this.pulsarClient = pulsarClient;
this.pulsarSourceConfig = pulsarConfig;
this.topicSchema = new TopicSchema(pulsarClient);
this.properties = properties;
this.functionClassLoader = functionClassLoader;
}
@Override
......@@ -147,7 +150,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
Map<String, ConsumerConfig<T>> configs = new TreeMap<>();
Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(),
Thread.currentThread().getContextClassLoader());
this.functionClassLoader);
checkArgument(!Void.class.equals(typeArg), "Input type of Pulsar Function cannot be Void");
......
......@@ -166,7 +166,7 @@ public class PulsarSinkTest {
PulsarSinkConfig pulsarConfig = getPulsarConfigs();
// set type to void
pulsarConfig.setTypeClassName(Void.class.getName());
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
Schema schema = pulsarSink.initializeSchema();
......@@ -184,7 +184,7 @@ public class PulsarSinkTest {
// set type to be inconsistent to that of SerDe
pulsarConfig.setTypeClassName(Integer.class.getName());
pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
pulsarSink.initializeSchema();
fail("Should fail constructing java instance if function type is inconsistent with serde type");
......@@ -206,7 +206,7 @@ public class PulsarSinkTest {
PulsarSinkConfig pulsarConfig = getPulsarConfigs();
// set type to void
pulsarConfig.setTypeClassName(String.class.getName());
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
pulsarSink.initializeSchema();
......@@ -225,7 +225,7 @@ public class PulsarSinkTest {
// set type to void
pulsarConfig.setTypeClassName(String.class.getName());
pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
pulsarSink.initializeSchema();
......@@ -241,7 +241,7 @@ public class PulsarSinkTest {
// set type to void
pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
pulsarSink.initializeSchema();
......@@ -263,7 +263,7 @@ public class PulsarSinkTest {
/** test At-least-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
......@@ -309,7 +309,8 @@ public class PulsarSinkTest {
/** test At-most-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
......@@ -351,7 +352,8 @@ public class PulsarSinkTest {
/** test Effectively-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
......
......@@ -127,7 +127,7 @@ public class PulsarSourceTest {
pulsarConfig.setTypeClassName(Void.class.getName());
@Cleanup
PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
......@@ -155,7 +155,7 @@ public class PulsarSourceTest {
pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
@Cleanup
PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
fail("Should fail constructing java instance if function type is inconsistent with serde type");
......@@ -182,7 +182,7 @@ public class PulsarSourceTest {
pulsarConfig.setTopicSchema(consumerConfigs);
@Cleanup
PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
}
......@@ -197,7 +197,7 @@ public class PulsarSourceTest {
pulsarConfig.setTopicSchema(consumerConfigs);
@Cleanup
PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
pulsarSource.setupConsumerConfigs();
}
......
......@@ -25,7 +25,6 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
......@@ -61,7 +60,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
import static org.apache.pulsar.functions.utils.FunctionCommon.extractClassLoader;
import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
@Slf4j
public class LocalRunner {
......@@ -407,7 +405,7 @@ public class LocalRunner {
serviceUrl,
stateStorageServiceUrl,
authConfig,
new ClearTextSecretsProvider(), null);
new ClearTextSecretsProvider(), null, null);
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
......
......@@ -35,39 +35,43 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-runtime</artifactId>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.parent.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
</dependencies>
......@@ -75,333 +79,23 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>java-instance</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>shade</goal>
<goal>single</goal>
</goals>
<configuration>
<finalName>java-instance</finalName>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
<artifactSet>
<excludes>
<exclude>io.netty:netty-common</exclude>
<exclude>io.netty:netty-buffer</exclude>
<exclude>io.netty:netty-codec-http2</exclude>
<exclude>io.netty:netty-codec-http</exclude>
<exclude>io.netty:netty-codec-socks</exclude>
<exclude>io.netty:netty-codec</exclude>
<exclude>io.netty:netty-handler</exclude>
<exclude>io.netty:netty-handler-proxy</exclude>
<exclude>io.netty:netty-transport</exclude>
<exclude>io.netty:netty-resolver</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Shading signed JARs will fail without
this. http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.typesafe.netty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.http</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.jute</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.jute</shadedPattern>
</relocation>
<relocation>
<pattern>javax.servlet</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.servlet</shadedPattern>
</relocation>
<relocation>
<pattern>org.junit</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.junit</shadedPattern>
</relocation>
<relocation>
<pattern>junit</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.junit</shadedPattern>
</relocation>
<relocation>
<pattern>net.jodah</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jodah</shadedPattern>
</relocation>
<relocation>
<pattern>org.lz4</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.lz4</shadedPattern>
</relocation>
<relocation>
<pattern>org.reactivestreams</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.reactivestreams</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>io.swagger</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.swagger</shadedPattern>
</relocation>
<relocation>
<pattern>org.yaml</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.yaml</shadedPattern>
</relocation>
<relocation>
<pattern>org.jctools</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.jctools</shadedPattern>
</relocation>
<relocation>
<pattern>com.squareup.okhttp</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okhttp</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.grpc</shadedPattern>
</relocation>
<relocation>
<pattern>org.joda</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.joda</shadedPattern>
</relocation>
<relocation>
<pattern>javax.ws.rs</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.ws.rs</shadedPattern>
</relocation>
<relocation>
<pattern>io.kubernetes</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.kubernetes</shadedPattern>
</relocation>
<relocation>
<pattern>io.opencensus</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.opencensus</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>org.aspectj</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.aspectj</shadedPattern>
</relocation>
<relocation>
<pattern>commons-configuration</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-configuration</shadedPattern>
</relocation>
<relocation>
<pattern>org.tukaani</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.tukaani</shadedPattern>
</relocation>
<relocation>
<pattern>com.github</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github</shadedPattern>
</relocation>
<relocation>
<pattern>commons-io</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-io</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.distributedlog</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
</relocation>
<relocation>
<pattern>org.inferred</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.bookkeeper</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</shadedPattern>
</relocation>
<relocation>
<pattern>org.bookkeeper</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bookkeeper</shadedPattern>
</relocation>
<relocation>
<pattern>dlshade</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
</relocation>
<relocation>
<pattern>org.codehaus.jackson</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>net.java.dev.jna</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.curator</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.curator</shadedPattern>
</relocation>
<relocation>
<pattern>javax.validation</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.validation</shadedPattern>
</relocation>
<relocation>
<pattern>javax.activation</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.activation</shadedPattern>
</relocation>
<relocation>
<pattern>io.prometheus</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.prometheus</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.zookeeper</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper</shadedPattern>
</relocation>
<relocation>
<pattern>io.jsonwebtoken</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.jsonwebtoken</shadedPattern>
</relocation>
<relocation>
<pattern>commons-codec</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-codec</shadedPattern>
</relocation>
<relocation>
<pattern>com.thoughtworks.paranamer</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.thoughtworks.paranamer</shadedPattern>
</relocation>
<relocation>
<pattern>org.codehaus.mojo</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.mojo</shadedPattern>
</relocation>
<relocation>
<pattern>com.github.luben</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github.luben</shadedPattern>
</relocation>
<relocation>
<pattern>jline</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.jline</shadedPattern>
</relocation>
<relocation>
<pattern>commons-logging</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-logging</shadedPattern>
</relocation>
<relocation>
<pattern>org.bouncycastle</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bouncycastle</shadedPattern>
</relocation>
<relocation>
<pattern>org.xerial.snappy</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.xerial.snappy</shadedPattern>
</relocation>
<relocation>
<pattern>javax.annotation</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.annotation</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.checkerframework</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.yetus</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.yetus</shadedPattern>
</relocation>
<relocation>
<pattern>commons-cli</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-cli</shadedPattern>
</relocation>
<relocation>
<pattern>commons-lang</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-lang</shadedPattern>
</relocation>
<relocation>
<pattern>com.squareup.okio</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okio</shadedPattern>
</relocation>
<relocation>
<pattern>org.rocksdb</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.rocksdb</shadedPattern>
</relocation>
<relocation>
<pattern>org.objenesis</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.objenesis</shadedPattern>
</relocation>
<relocation>
<pattern>org.eclipse.jetty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.eclipse.jetty</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.avro</shadedPattern>
</relocation>
<relocation>
<pattern>avro.shaded</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.avo.shaded</shadedPattern>
</relocation>
<relocation>
<pattern>com.yahoo</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.yahoo</shadedPattern>
</relocation>
<relocation>
<pattern>com.beust</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
</relocation>
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>org.hamcrest</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
</relocation>
<relocation>
<pattern>aj.org</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.aj.org</shadedPattern>
</relocation>
<relocation>
<pattern>com.scurrilous</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.scurrilous</shadedPattern>
</relocation>
<relocation>
<pattern>okio</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.okio</shadedPattern>
</relocation>
<!--
asynchttpclient can only be shaded to be under `org.apache.pulsar.shade`
see {@link https://github.com/apache/incubator-pulsar/pull/390}
and {@link https://github.com/apache/incubator-pulsar/blob/master/pulsar-client/src/main/resources/ahc.properties}
-->
<relocation>
<pattern>org.asynchttpclient</pattern>
<shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
</relocation>
<!-- DONT ever shade log4j, otherwise logging won't work anymore in running functions in process mode
<relocation>
<pattern>org.apache.logging</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.logging</shadedPattern>
</relocation>
-->
<relocation>
<pattern>io.swagger</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.swagger</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;
import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/**
* This is the initial class that gets called when starting a Java Function instance.
* Multiple class loaders are used to separate function instance dependencies from user code dependencies
* This class will create three classloaders:
* 1. The root classloader that will share interfaces between the function instance
* classloader and user code classloader. This classloader will contain the following dependencies
* - pulsar-functions-api
* - pulsar-client-api
* - log4j-slf4j-impl
* - slf4j-api
* - log4j-core
* - log4j-api
*
* 2. The Function instance classloader, a child of the root classloader, that loads all pulsar broker/worker dependencies
* 3. The user code classloader, a child of the root classloader, that loads all user code dependencies
*
* This class should not use any other dependencies!
*
*/
public class JavaInstanceMain {
private static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
public JavaInstanceMain() { }
public static void main(String[] args) throws Exception {
// Set root classloader to current classpath
ClassLoader root = Thread.currentThread().getContextClassLoader();
// Get classpath for function instance
String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
if (functionInstanceClasspath == null) {
throw new IllegalArgumentException("Propery " + FUNCTIONS_INSTANCE_CLASSPATH + " is not set!");
}
List<File> files = new LinkedList<>();
for (String entry: functionInstanceClasspath.split(":")) {
if (isBlank(entry)) {
continue;
}
// replace any asterisks i.e. wildcards as they don't work with url classloader
File f = new File(entry.replace("*", ""));
if (f.exists()) {
if (f.isDirectory()) {
files.addAll(Arrays.asList(f.listFiles()));
} else {
files.add(new File(entry));
}
} else {
System.out.println(String.format("[WARN] %s on functions instance classpath does not exist", f.getAbsolutePath()));
}
}
ClassLoader functionInstanceClsLoader = loadJar(root, files.toArray(new File[files.size()]));
System.out.println("Using function root classloader: " + root);
System.out.println("Using function instance classloader: " + functionInstanceClsLoader);
// use the function instance classloader to create org.apache.pulsar.functions.runtime.JavaInstanceStarter
Object main = createInstance("org.apache.pulsar.functions.runtime.JavaInstanceStarter", functionInstanceClsLoader);
// Invoke start method of JavaInstanceStarter to start the function instance code
Method method = main.getClass().getDeclaredMethod("start", String[].class, ClassLoader.class, ClassLoader.class);
System.out.println("Starting function instance...");
method.invoke(main, args, functionInstanceClsLoader, root);
}
public static Object createInstance(String userClassName,
ClassLoader classLoader) {
Class<?> theCls;
try {
theCls = Class.forName(userClassName, true, classLoader);
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("Class " + userClassName + " must be in class path", cnfe);
}
Object result;
try {
Constructor<?> meth = theCls.getDeclaredConstructor();
meth.setAccessible(true);
result = meth.newInstance();
} catch (InstantiationException ie) {
throw new RuntimeException("User class must be concrete", ie);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Class " + userClassName + " doesn't have such method", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Class " + userClassName + " must have a no-arg constructor", e);
} catch (InvocationTargetException e) {
throw new RuntimeException("Class " + userClassName + " constructor throws exception", e);
}
return result;
}
public static ClassLoader loadJar(ClassLoader parent, File[] jars) throws MalformedURLException {
URL[] urls = new URL[jars.length];
for (int i = 0; i < jars.length; i++) {
urls[i] = jars[i].toURI().toURL();
}
return new URLClassLoader(urls, parent);
}
public static boolean isBlank(String str) {
int strLen;
if (str != null && (strLen = str.length()) != 0) {
for(int i = 0; i < strLen; ++i) {
if (!Character.isWhitespace(str.charAt(i))) {
return false;
}
}
return true;
} else {
return true;
}
}
}
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<Configuration>
<name>pulsar-functions-instance</name>
<monitorInterval>30</monitorInterval>
<Properties>
<Property>
<name>pulsar.log.appender</name>
<value>RollingFile</value>
</Property>
<Property>
<name>pulsar.log.level</name>
<value>info</value>
</Property>
<Property>
<name>bk.log.level</name>
<value>info</value>
</Property>
</Properties>
<Appenders>
<Console>
<name>Console</name>
<target>SYSTEM_OUT</target>
<PatternLayout>
<Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
</Console>
<RollingFile>
<name>RollingFile</name>
<fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log</fileName>
<filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz</filePattern>
<immediateFlush>true</immediateFlush>
<PatternLayout>
<Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy>
<interval>1</interval>
<modulate>true</modulate>
</TimeBasedTriggeringPolicy>
<SizeBasedTriggeringPolicy>
<size>1 GB</size>
</SizeBasedTriggeringPolicy>
<CronTriggeringPolicy>
<schedule>0 0 0 * * ?</schedule>
</CronTriggeringPolicy>
</Policies>
<DefaultRolloverStrategy>
<Delete>
<basePath>${sys:pulsar.function.log.dir}</basePath>
<maxDepth>2</maxDepth>
<IfFileName>
<glob>*/${sys:pulsar.function.log.file}*log.gz</glob>
</IfFileName>
<IfLastModified>
<age>30d</age>
</IfLastModified>
</Delete>
</DefaultRolloverStrategy>
</RollingFile>
<RollingRandomAccessFile>
<name>BkRollingFile</name>
<fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk</fileName>
<filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz</filePattern>
<immediateFlush>true</immediateFlush>
<PatternLayout>
<Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy>
<interval>1</interval>
<modulate>true</modulate>
</TimeBasedTriggeringPolicy>
<SizeBasedTriggeringPolicy>
<size>1 GB</size>
</SizeBasedTriggeringPolicy>
<CronTriggeringPolicy>
<schedule>0 0 0 * * ?</schedule>
</CronTriggeringPolicy>
</Policies>
<DefaultRolloverStrategy>
<Delete>
<basePath>${sys:pulsar.function.log.dir}</basePath>
<maxDepth>2</maxDepth>
<IfFileName>
<glob>*/${sys:pulsar.function.log.file}.bk*log.gz</glob>
</IfFileName>
<IfLastModified>
<age>30d</age>
</IfLastModified>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<Logger>
<name>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</name>
<level>${sys:bk.log.level}</level>
<additivity>false</additivity>
<AppenderRef>
<ref>BkRollingFile</ref>
</AppenderRef>
</Logger>
<Root>
<level>info</level>
<AppenderRef>
<ref>${sys:pulsar.log.appender}</ref>
<level>${sys:pulsar.log.level}</level>
</AppenderRef>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<Configuration>
<name>pulsar-functions-kubernetes-instance</name>
<monitorInterval>30</monitorInterval>
<Properties>
<Property>
<name>pulsar.log.level</name>
<value>info</value>
</Property>
<Property>
<name>bk.log.level</name>
<value>info</value>
</Property>
</Properties>
<Appenders>
<Console>
<name>Console</name>
<target>SYSTEM_OUT</target>
<PatternLayout>
<Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
</Console>
</Appenders>
<Loggers>
<Logger>
<name>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</name>
<level>${sys:bk.log.level}</level>
<additivity>false</additivity>
<AppenderRef>
<ref>Console</ref>
</AppenderRef>
</Logger>
<Root>
<level>info</level>
<AppenderRef>
<ref>Console</ref>
<level>${sys:pulsar.log.level}</level>
</AppenderRef>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
......@@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
......@@ -50,72 +50,70 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* A function container implemented using java thread.
*/
@Slf4j
public class JavaInstanceMain implements AutoCloseable {
public class JavaInstanceStarter implements AutoCloseable {
@Parameter(names = "--function_details", description = "Function details json\n", required = true)
protected String functionDetailsJsonString;
public String functionDetailsJsonString;
@Parameter(
names = "--jar",
description = "Path to Jar\n",
listConverter = StringConverter.class)
protected String jarFile;
public String jarFile;
@Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
protected int instanceId;
public int instanceId;
@Parameter(names = "--function_id", description = "Function Id\n", required = true)
protected String functionId;
public String functionId;
@Parameter(names = "--function_version", description = "Function Version\n", required = true)
protected String functionVersion;
public String functionVersion;
@Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
protected String pulsarServiceUrl;
public String pulsarServiceUrl;
@Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n")
protected String clientAuthenticationPlugin;
public String clientAuthenticationPlugin;
@Parameter(names = "--client_auth_params", description = "Client auth param\n")
protected String clientAuthenticationParameters;
public String clientAuthenticationParameters;
@Parameter(names = "--use_tls", description = "Use tls connection\n")
protected String useTls = Boolean.FALSE.toString();
public String useTls = Boolean.FALSE.toString();
@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
protected String tlsAllowInsecureConnection = Boolean.TRUE.toString();
public String tlsAllowInsecureConnection = Boolean.TRUE.toString();
@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
protected String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
public String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;
public String tlsTrustCertFilePath;
@Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required= false)
protected String stateStorageServiceUrl;
public String stateStorageServiceUrl;
@Parameter(names = "--port", description = "Port to listen on\n", required = true)
protected int port;
public int port;
@Parameter(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true)
protected int metrics_port;
public int metrics_port;
@Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
protected int maxBufferedTuples;
public int maxBufferedTuples;
@Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
protected int expectedHealthCheckInterval;
public int expectedHealthCheckInterval;
@Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
protected String secretsProviderClassName;
public String secretsProviderClassName;
@Parameter(names = "--secrets_provider_config", description = "The config that needs to be passed to secrets provider", required = false)
protected String secretsProviderConfig;
public String secretsProviderConfig;
@Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true)
protected String clusterName;
public String clusterName;
private Server server;
private RuntimeSpawner runtimeSpawner;
......@@ -124,17 +122,22 @@ public class JavaInstanceMain implements AutoCloseable {
private HTTPServer metricsServer;
private ScheduledFuture healthCheckTimer;
public JavaInstanceMain() { }
public JavaInstanceStarter() { }
public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassLoader rootClassLoader) throws Exception {
Thread.currentThread().setContextClassLoader(functionInstanceClassLoader);
JCommander jcommander = new JCommander(this);
// parse args by JCommander
jcommander.parse(args);
public void start() throws Exception {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionId(functionId);
instanceConfig.setFunctionVersion(functionVersion);
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
instanceConfig.setClusterName(clusterName);
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
}
......@@ -142,7 +145,7 @@ public class JavaInstanceMain implements AutoCloseable {
functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
FunctionDetails functionDetails = functionDetailsBuilder.build();
Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
......@@ -164,7 +167,7 @@ public class JavaInstanceMain implements AutoCloseable {
SecretsProvider secretsProvider;
try {
secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, functionInstanceClassLoader);
} catch (Exception e) {
throw new RuntimeException(e);
}
......@@ -180,7 +183,7 @@ public class JavaInstanceMain implements AutoCloseable {
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
secretsProvider, collectorRegistry);
secretsProvider, collectorRegistry, rootClassLoader);
runtimeSpawner = new RuntimeSpawner(
instanceConfig,
jarFile,
......@@ -234,16 +237,6 @@ public class JavaInstanceMain implements AutoCloseable {
return Boolean.TRUE.toString().equals(param);
}
public static void main(String[] args) throws Exception {
JavaInstanceMain javaInstanceMain = new JavaInstanceMain();
JCommander jcommander = new JCommander(javaInstanceMain);
jcommander.setProgramName("JavaInstanceMain");
// parse args by JCommander
jcommander.parse(args);
javaInstanceMain.start();
}
@Override
public void close() {
try {
......@@ -309,7 +302,7 @@ public class JavaInstanceMain implements AutoCloseable {
@Override
public void getMetrics(com.google.protobuf.Empty request,
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
......@@ -324,7 +317,7 @@ public class JavaInstanceMain implements AutoCloseable {
}
public void resetMetrics(com.google.protobuf.Empty request,
io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
......
......@@ -185,7 +185,7 @@ public class KubernetesRuntime implements Runtime {
}
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
logConfigFile = "kubernetes_instance_log4j2.yml";
logConfigFile = "kubernetes_instance_log4j2.xml";
break;
case PYTHON:
logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
......
......@@ -97,7 +97,7 @@ class ProcessRuntime implements Runtime {
}
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
logConfigFile = "java_instance_log4j2.yml";
logConfigFile = "java_instance_log4j2.xml";
break;
case PYTHON:
logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
......
......@@ -39,7 +39,6 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
......@@ -51,6 +50,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class RuntimeUtils {
private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";
static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
public static List<String> composeCmd(InstanceConfig instanceConfig,
String instanceFile,
......@@ -255,17 +255,25 @@ public class RuntimeUtils {
args.add("-cp");
String classpath = instanceFile;
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
classpath = classpath + ":" + extraDependenciesDir + "/*";
}
args.add(classpath);
// Keep the same env property pointing to the Java instance file so that it can be picked up
// by the child process and manually added to classpath
args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
}
// add complete classpath for broker/worker so that the function instance can load
// the functions instance dependencies separately from user code dependencies
String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
if (functionInstanceClasspath == null) {
log.warn("Property {} is not set. Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH);
functionInstanceClasspath = System.getProperty("java.class.path");
}
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClasspath));
args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
args.add("-Dpulsar.function.log.file=" + String.format(
......@@ -283,7 +291,8 @@ public class RuntimeUtils {
args.add("-Xmx" + String.valueOf(resources.getRam()));
}
}
args.add(JavaInstanceMain.class.getName());
args.add("org.apache.pulsar.functions.instance.JavaInstanceMain");
args.add("--jar");
args.add(originalCodeFileName);
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
......
......@@ -20,12 +20,8 @@
package org.apache.pulsar.functions.runtime;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.CollectorRegistry;
import lombok.extern.slf4j.Slf4j;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
......@@ -36,6 +32,13 @@ import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* Thread based function container factory implementation.
*/
......@@ -51,21 +54,36 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private volatile boolean closed;
public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
AuthenticationConfig authConfig, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) throws Exception {
this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl, secretsProvider, collectorRegistry);
AuthenticationConfig authConfig, SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry, ClassLoader rootClassLoader) throws Exception {
this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
storageServiceUrl, secretsProvider, collectorRegistry, rootClassLoader);
}
@VisibleForTesting
public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) {
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
ClassLoader rootClassLoader) {
if (rootClassLoader == null) {
rootClassLoader = Thread.currentThread().getContextClassLoader();
}
this.secretsProvider = secretsProvider;
this.fnCache = new FunctionCacheManagerImpl();
this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarClient = pulsarClient;
this.storageServiceUrl = storageServiceUrl;
this.collectorRegistry = collectorRegistry;
}
public static ClassLoader loadJar(ClassLoader parent, File[] jars) throws MalformedURLException {
URL[] urls = new URL[jars.length];
for (int i = 0; i < jars.length; i++) {
urls[i] = jars[i].toURI().toURL();
}
return new URLClassLoader(urls, parent);
}
private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
throws PulsarClientException {
ClientBuilder clientBuilder = null;
......
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Configuration:
name: pulsar-functions-instance
monitorInterval: 30
Properties:
Property:
- name: "pulsar.log.appender"
value: "RollingFile"
- name: "pulsar.log.level"
value: "info"
- name: "bk.log.level"
value: "info"
Appenders:
# Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
# Rolling file appender configuration
RollingFile:
name: RollingFile
fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log"
filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: true
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.function.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.function.log.file}*log.gz"
IfLastModified:
age: 30d
# Rolling file appender configuration for bk
RollingRandomAccessFile:
name: BkRollingFile
fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk"
filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: true
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.function.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.function.log.file}.bk*log.gz"
IfLastModified:
age: 30d
Loggers:
Logger:
name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper
level: "${sys:bk.log.level}"
additivity: false
AppenderRef:
- ref: BkRollingFile
Root:
level: info
AppenderRef:
- ref: "${sys:pulsar.log.appender}"
level: "${sys:pulsar.log.level}"
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Configuration:
name: pulsar-functions-kubernetes-instance
monitorInterval: 30
Properties:
Property:
- name: "pulsar.log.level"
value: "info"
- name: "bk.log.level"
value: "info"
Appenders:
# Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Loggers:
Logger:
name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper
level: "${sys:bk.log.level}"
additivity: false
AppenderRef:
- ref: Console
Root:
level: info
AppenderRef:
- ref: Console
level: "${sys:pulsar.log.level}"
......@@ -34,6 +34,7 @@ import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderCo
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.lang.reflect.Type;
......@@ -43,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.testng.Assert.assertEquals;
......@@ -135,6 +137,11 @@ public class KubernetesRuntimeTest {
this.logDirectory = "logs/functions";
}
@BeforeClass
public void setup() {
System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
}
@AfterMethod
public void tearDown() {
if (null != this.factory) {
......@@ -288,13 +295,13 @@ public class KubernetesRuntimeTest {
"Actual args : " + StringUtils.join(args, " "));
String expectedArgs = "exec java -cp " + classpath
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ extraDepsEnv
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Xmx" + String.valueOf(RESOURCES.getRam())
+ " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + pulsarRootDir + "/" + userJarFile + " --instance_id "
+ "$SHARD_ID" + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
......
......@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.runtime;
import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
......@@ -46,6 +47,7 @@ import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
......@@ -123,6 +125,11 @@ public class ProcessRuntimeTest {
this.logDirectory = "Users/user/logs";
}
@BeforeClass
public void setup() {
System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
}
@AfterMethod
public void tearDown() {
if (null != this.factory) {
......@@ -266,12 +273,12 @@ public class ProcessRuntimeTest {
}
String expectedArgs = "java -cp " + classpath
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ extraDepsEnv
+ " -Dlog4j.configurationFile=java_instance_log4j2.yml "
+ " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ " -Dlog4j.configurationFile=java_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
+ " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + userJarFile + " --instance_id "
+ config.getInstanceId() + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
......
......@@ -61,8 +61,9 @@ public class FunctionCacheEntry implements AutoCloseable {
FunctionCacheEntry(Collection<String> requiredJarFiles,
Collection<URL> requiredClasspaths,
URL[] libraryURLs,
String initialInstanceId) {
this.classLoader = FunctionClassLoaders.create(libraryURLs, FunctionClassLoaders.class.getClassLoader());
String initialInstanceId, ClassLoader rootClassLoader) {
this.classLoader = FunctionClassLoaders.create(libraryURLs, rootClassLoader);
this.classpaths = requiredClasspaths.stream()
.map(URL::toString)
.collect(Collectors.toSet());
......@@ -70,17 +71,8 @@ public class FunctionCacheEntry implements AutoCloseable {
this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
}
private static final Set<String> JAVA_INSTANCE_ADDITIONAL_JARS = isNoneBlank(
System.getProperty(JAVA_INSTANCE_JAR_PROPERTY))
? Collections.singleton(System.getProperty(JAVA_INSTANCE_JAR_PROPERTY))
: Collections.emptySet();
FunctionCacheEntry(String narArchive, String initialInstanceId) throws IOException {
if (JAVA_INSTANCE_ADDITIONAL_JARS.isEmpty()) {
log.warn("java-instance jar path not set in system-property= {} ", JAVA_INSTANCE_JAR_PROPERTY);
throw new IllegalStateException(JAVA_INSTANCE_JAR_PROPERTY + " system property not set");
}
this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), JAVA_INSTANCE_ADDITIONAL_JARS);
FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader) throws IOException {
this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(), rootClassLoader);
this.classpaths = Collections.emptySet();
this.jarFiles = Collections.singleton(narArchive);
this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
......
......@@ -37,8 +37,11 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
/** Registered Functions **/
private final Map<String, FunctionCacheEntry> cacheFunctions;
public FunctionCacheManagerImpl() {
private ClassLoader rootClassLoader;
public FunctionCacheManagerImpl(ClassLoader rootClassLoader) {
this.cacheFunctions = new ConcurrentHashMap<>();
this.rootClassLoader = rootClassLoader;
}
Map<String, FunctionCacheEntry> getCacheFunctions() {
......@@ -93,7 +96,7 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
requiredJarFiles,
requiredClasspaths,
urls,
eid));
eid, rootClassLoader));
} catch (Throwable cause) {
Exceptions.rethrowIOException(cause);
}
......@@ -122,7 +125,7 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
// Create new cache entry
try {
cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid));
cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader));
} catch (Throwable cause) {
Exceptions.rethrowIOException(cause);
}
......
......@@ -140,7 +140,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
workerConfig.getStateStorageServiceUrl(),
authConfig,
new ClearTextSecretsProvider(),
null);
null, null);
} else if (workerConfig.getProcessContainerFactory() != null) {
this.runtimeFactory = new ProcessRuntimeFactory(
workerConfig.getPulsarServiceUrl(),
......
......@@ -143,7 +143,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
......@@ -189,7 +189,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
......@@ -236,7 +236,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
......@@ -296,7 +296,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
......@@ -362,7 +362,7 @@ public class SchedulerManagerTest {
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider
(), new CollectorRegistry());
(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
......@@ -473,7 +473,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
......@@ -600,7 +600,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
......@@ -654,7 +654,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
......@@ -787,7 +787,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册