diff --git a/bin/pulsar b/bin/pulsar
index ad7736a08ea283c24940a14c3d60fb62d324b02a..fd90aa13123ff53bdf5519b75545e9f9e24c46dd 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -283,6 +283,8 @@ OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
+OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
+
ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=*"
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index fdd2102b115440e9b78fc580a0991135d7ff9bfe..9281d8494e0f3dd127cb888c96aa093c873f528f 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -156,12 +156,6 @@
${project.version}
provided
-
-
- io.grpc
- *
-
-
diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml
index c2d5996506587787031fa6957a9fdd27b4e298a4..bb3e2736cc338c0277537f119f8b6df6b119c1d6 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -131,6 +131,8 @@
io.netty:netty-transport-native-epoll
io.netty:netty-transport-native-unix-common
+ org.apache.pulsar:pulsar-functions-runtime-all
+
org.apache.zookeeper:zookeeper
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
index 4d9db6edd2702576c10486f26fb15765d67c690e..101c77d80993c82f3df6f70b0cb8252ef3d54c2b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
@@ -50,7 +50,15 @@ class ReflectionUtils {
@SuppressWarnings("unchecked")
static Class newClassInstance(String className) {
try {
- return (Class) DefaultImplementation.class.getClassLoader().loadClass(className);
+ try {
+ // when the API is loaded in the same classloader as the impl
+ return (Class) DefaultImplementation.class.getClassLoader().loadClass(className);
+ } catch (Exception e) {
+ // when the API is loaded in a separate classloader as the impl
+ // the classloader that loaded the impl needs to be a child classloader of the classloader
+ // that loaded the API
+ return (Class) Thread.currentThread().getContextClassLoader().loadClass(className);
+ }
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b6c4fc86d69ea4d088baaf7ffd6a2a3b0b0cf2a5..66d4ba7c3ac79f11a928edbfe95f3fe39eeb03f4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -90,8 +90,6 @@ import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST
@Slf4j
public class JavaInstanceRunnable implements AutoCloseable, Runnable {
- // The class loader that used for loading functions
- private ClassLoader fnClassLoader;
private final InstanceConfig instanceConfig;
private final FunctionCacheManager fnCache;
private final String jarFile;
@@ -132,6 +130,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final Map properties;
+ private final ClassLoader instanceClassLoader;
+ private ClassLoader functionClassLoader;
+
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
@@ -166,12 +167,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// metrics collection especially in threaded mode
// In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
this.collectorRegistry = collectorRegistry;
+
+ this.instanceClassLoader = Thread.currentThread().getContextClassLoader();
}
/**
* NOTE: this method should be called in the instance thread, in order to make class loading work.
*/
- JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
+ JavaInstance setupJavaInstance() throws Exception {
// initialize the thread context
ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
@@ -181,18 +184,21 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
// start the function thread
- loadJars();
+ functionClassLoader = loadJars();
- ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
Object object = Reflections.createInstance(
instanceConfig.getFunctionDetails().getClassName(),
- clsLoader);
+ functionClassLoader);
+
if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
throw new RuntimeException("User class must either be Function or java.util.Function");
}
// start the state table
setupStateTable();
+
+ ContextImpl contextImpl = setupContext();
+
// start the output producer
setupOutput(contextImpl);
// start the input consumer
@@ -225,8 +231,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.instanceCache.getScheduledExecutorService(),
this.componentType);
- ContextImpl contextImpl = setupContext();
- javaInstance = setupJavaInstance(contextImpl);
+ javaInstance = setupJavaInstance();
if (null != stateTable) {
StateContextImpl stateContext = new StateContextImpl(stateTable);
javaInstance.getContext().setStateContext(stateContext);
@@ -254,7 +259,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
stats.processTimeStart();
// process the message
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
// register end time
stats.processTimeEnd();
@@ -289,7 +296,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
- private void loadJars() throws Exception {
+ private ClassLoader loadJars() throws Exception {
+ ClassLoader fnClassLoader;
try {
log.info("Load JAR: {}", jarFile);
// Let's first try to treat it as a nar archive
@@ -309,13 +317,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
log.info("Initialize function class loader for function {} at function cache manager",
instanceConfig.getFunctionDetails().getName());
- this.fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+ fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
throw new Exception("No function class loader available.");
}
- // make sure the function class loader is accessible thread-locally
- Thread.currentThread().setContextClassLoader(fnClassLoader);
+ return fnClassLoader;
}
private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
@@ -425,23 +432,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
private void sendOutputMessage(Record srcRecord, Object output) {
+ if (!(this.sink instanceof PulsarSink)) {
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ }
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
throw new RuntimeException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
}
private Record readInput() {
Record record;
+ if (!(this.source instanceof PulsarSource)) {
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ }
try {
record = this.source.read();
} catch (Exception e) {
stats.incrSourceExceptions(e);
log.info("Encountered exception in source read: ", e);
throw new RuntimeException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
// check record is valid
@@ -466,19 +483,29 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
if (source != null) {
+ if (!(this.source instanceof PulsarSource)) {
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ }
try {
source.close();
} catch (Throwable e) {
log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
source = null;
}
if (sink != null) {
+ if (!(this.sink instanceof PulsarSink)) {
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ }
try {
sink.close();
} catch (Throwable e) {
log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
sink = null;
}
@@ -667,11 +694,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
- object = new PulsarSource(this.client, pulsarSourceConfig, this.properties);
+ object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
- Thread.currentThread().getContextClassLoader());
+ this.functionClassLoader);
}
Class>[] typeArgs;
@@ -683,11 +710,22 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
this.source = (Source>) object;
- if (sourceSpec.getConfigs().isEmpty()) {
- this.source.open(new HashMap<>(), contextImpl);
- } else {
- this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
- new TypeToken