未验证 提交 ea23cdff 编写于 作者: B Boyang Jerry Peng 提交者: GitHub

Don't create new instances of user classes during validation (#4281)

* Don't create new instances of user classes during validation

* fixing tests

* uncomment functions
上级 39004c83
......@@ -86,46 +86,33 @@ public class FunctionCommon {
}
}
public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader) {
Object userClass = createInstance(functionConfig.getClassName(), classLoader);
public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig, ClassLoader classLoader) throws ClassNotFoundException {
boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null;
return getFunctionTypes(userClass, isWindowConfigPresent);
Class functionClass = classLoader.loadClass(functionConfig.getClassName());
return getFunctionTypes(functionClass, isWindowConfigPresent);
}
public static Class<?>[] getFunctionTypes(Object userClass, boolean isWindowConfigPresent) {
public static Class<?>[] getFunctionTypes(Class userClass, boolean isWindowConfigPresent) {
Class<?>[] typeArgs;
// if window function
if (isWindowConfigPresent) {
if (userClass instanceof WindowFunction) {
WindowFunction function = (WindowFunction) userClass;
if (function == null) {
throw new IllegalArgumentException(
String.format("The WindowFunction class %s could not be instantiated", userClass));
}
typeArgs = TypeResolver.resolveRawArguments(WindowFunction.class, function.getClass());
if (WindowFunction.class.isAssignableFrom(userClass)) {
typeArgs = TypeResolver.resolveRawArguments(WindowFunction.class, userClass);
} else {
java.util.function.Function function = (java.util.function.Function) userClass;
if (function == null) {
throw new IllegalArgumentException(
String.format("The Java util function class %s could not be instantiated", userClass));
}
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass);
if (!typeArgs[0].equals(Collection.class)) {
throw new IllegalArgumentException("Window function must take a collection as input");
}
Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, userClass);
Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
typeArgs[0] = (Class<?>) actualInputType;
}
} else {
if (userClass instanceof Function) {
Function pulsarFunction = (Function) userClass;
typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
if (Function.class.isAssignableFrom(userClass)) {
typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass);
} else {
java.util.function.Function function = (java.util.function.Function) userClass;
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, userClass);
}
}
......@@ -200,30 +187,20 @@ public class FunctionCommon {
}
public static Class<?> getSourceType(String className, ClassLoader classloader) {
public static Class<?> getSourceType(String className, ClassLoader classLoader) throws ClassNotFoundException {
Object userClass = Reflections.createInstance(className, classloader);
Class<?> typeArg;
Source source = (Source) userClass;
if (source == null) {
throw new IllegalArgumentException(String.format("The Pulsar source class %s could not be instantiated",
className));
}
typeArg = TypeResolver.resolveRawArgument(Source.class, source.getClass());
Class userClass = classLoader.loadClass(className);
Class<?> typeArg = TypeResolver.resolveRawArgument(Source.class, userClass);
return typeArg;
}
public static Class<?> getSinkType(String className, ClassLoader classLoader) {
public static Class<?> getSinkType(String className, ClassLoader classLoader) throws ClassNotFoundException {
Object userClass = Reflections.createInstance(className, classLoader);
Class<?> typeArg;
Sink sink = (Sink) userClass;
if (sink == null) {
throw new IllegalArgumentException(String.format("The Pulsar sink class %s could not be instantiated",
className));
}
typeArg = TypeResolver.resolveRawArgument(Sink.class, sink.getClass());
Class userClass = classLoader.loadClass(className);
Class<?> typeArg = TypeResolver.resolveRawArgument(Sink.class, userClass);
return typeArg;
}
......
......@@ -50,7 +50,12 @@ public class FunctionConfigUtils {
Class<?>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
if (classLoader != null) {
typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
try {
typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
}
}
......@@ -363,7 +368,14 @@ public class FunctionConfigUtils {
}
private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
Class<?>[] typeArgs;
try {
typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionConfig.getClassName()), e);
}
// inputs use default schema, so there is no check needed there
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
......@@ -614,6 +626,7 @@ public class FunctionConfigUtils {
} else {
throw new IllegalArgumentException("Function Package is not provided");
}
doJavaChecks(functionConfig, classLoader);
return classLoader;
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
......
......@@ -345,7 +345,12 @@ public class SinkConfigUtils {
tmptypeArg = getSinkType(sinkClassName, narClassLoader);
tmpclassLoader = narClassLoader;
} catch (Exception e) {
tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
try {
tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e1);
}
tmpclassLoader = jarClassLoader;
}
typeArg = tmptypeArg;
......@@ -362,7 +367,12 @@ public class SinkConfigUtils {
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
}
typeArg = getSinkType(sinkClassName, classLoader);
try {
typeArg = getSinkType(sinkClassName, classLoader);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e);
}
}
if (sinkConfig.getTopicToSerdeClassName() != null) {
......
......@@ -250,7 +250,12 @@ public class SourceConfigUtils {
tmptypeArg = getSourceType(sourceClassName, narClassLoader);
tmpclassLoader = narClassLoader;
} catch (Exception e) {
tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
try {
tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e1);
}
tmpclassLoader = jarClassLoader;
}
typeArg = tmptypeArg;
......@@ -267,7 +272,12 @@ public class SourceConfigUtils {
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract source class from archive", e1);
}
typeArg = getSourceType(sourceClassName, classLoader);
try {
typeArg = getSourceType(sourceClassName, classLoader);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e);
}
}
// Only one of serdeClassName or schemaType should be set
......
......@@ -144,11 +144,17 @@ public class ValidatorUtils {
}
// validate function class-type
Object functionObject = createInstance(functionDetailsBuilder.getClassName(), classLoader);
Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionObject, false);
Class functionClass;
try {
functionClass = classLoader.loadClass(functionDetailsBuilder.getClassName());
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Function class %s must be in class path", functionDetailsBuilder.getClassName()), e);
}
Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionClass, false);
if (!(functionObject instanceof org.apache.pulsar.functions.api.Function)
&& !(functionObject instanceof java.util.function.Function)) {
if (!(org.apache.pulsar.functions.api.Function.class.isAssignableFrom(functionClass))
&& !(java.util.function.Function.class.isAssignableFrom(functionClass))) {
throw new RuntimeException("User class must either be Function or java.util.Function");
}
......
......@@ -58,8 +58,8 @@ public class ConnectorUtils {
try {
// Try to load source class and check it implements Source interface
Object instance = ncl.loadClass(conf.getSourceClass()).newInstance();
if (!(instance instanceof Source)) {
Class sourceClass = ncl.loadClass(conf.getSourceClass());
if (!(Source.class.isAssignableFrom(sourceClass))) {
throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
+ Source.class.getName());
}
......@@ -86,8 +86,8 @@ public class ConnectorUtils {
try {
// Try to load source class and check it implements Sink interface
Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
if (!(instance instanceof Sink)) {
Class sinkClass = ncl.loadClass(conf.getSinkClass());
if (!(Sink.class.isAssignableFrom(sinkClass))) {
throw new IOException(
"Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName());
}
......
......@@ -390,7 +390,7 @@ public class FunctionActioner {
File.separatorChar);
}
private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException {
private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException, ClassNotFoundException {
if (functionDetails.hasSource()) {
SourceSpec sourceSpec = functionDetails.getSource();
if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
......@@ -423,7 +423,7 @@ public class FunctionActioner {
}
private void fillSourceTypeClass(FunctionDetails.Builder functionDetails, File archive, String className)
throws IOException {
throws IOException, ClassNotFoundException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) {
String typeArg = getSourceType(className, ncl).getName();
......@@ -441,7 +441,7 @@ public class FunctionActioner {
}
private void fillSinkTypeClass(FunctionDetails.Builder functionDetails, File archive, String className)
throws IOException {
throws IOException, ClassNotFoundException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) {
String typeArg = getSinkType(className, ncl).getName();
......
......@@ -339,7 +339,7 @@ public class FunctionApiV2ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "User class must be in class path")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function class UnknownClass must be in class path")
public void testRegisterFunctionWrongClassName() {
try {
testRegisterFunctionMissingArguments(
......
......@@ -46,9 +46,9 @@ import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
......@@ -92,7 +92,7 @@ import static org.testng.Assert.assertEquals;
* Unit test of {@link FunctionApiV2Resource}.
*/
@PrepareForTest({WorkerUtils.class, InstanceUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
@Slf4j
public class FunctionApiV3ResourceTest {
......@@ -332,7 +332,7 @@ public class FunctionApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "User class must be in class path")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function class UnknownClass must be in class path")
public void testRegisterFunctionWrongClassName() {
try {
testRegisterFunctionMissingArguments(
......
......@@ -225,7 +225,7 @@ public class SinkApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Name is not provided")
public void testRegisterSinkMissingFunctionName() {
public void testRegisterSinkMissingSinkName() {
try {
testRegisterSinkMissingArguments(
tenant,
......@@ -263,6 +263,26 @@ public class SinkApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink class UnknownClass must be in class path")
public void testRegisterSinkWrongClassName() {
try {
testRegisterSinkMissingArguments(
tenant,
namespace,
sink,
mockedInputStream,
mockedFormData,
topicsToSerDeClassName,
"UnknownClass",
parallelism,
null
);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "zip file is empty")
public void testRegisterSinkMissingPackageDetails() {
try {
......@@ -489,7 +509,7 @@ public class SinkApiV3ResourceTest {
RequestResult rr = new RequestResult()
.setSuccess(true)
.setMessage("source registered");
.setMessage("sink registered");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
......@@ -914,7 +934,7 @@ public class SinkApiV3ResourceTest {
}
@Test
public void testUpdateSinkWithUrl() throws IOException {
public void testUpdateSinkWithUrl() throws IOException, ClassNotFoundException {
Configurator.setRootLevel(Level.DEBUG);
String filePackageUrl = "file://" + JAR_FILE_PATH;
......
......@@ -242,6 +242,27 @@ public class SourceApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source class UnknownClass must be in class path")
public void testRegisterSourceWrongClassName() {
try {
testRegisterSourceMissingArguments(
tenant,
namespace,
source,
mockedInputStream,
mockedFormData,
outputTopic,
outputSerdeClassName,
"UnknownClass",
parallelism,
null
);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided")
public void testRegisterSourceMissingPackage() {
try {
......@@ -443,6 +464,7 @@ public class SourceApiV3ResourceTest {
}
}
@Test
public void testRegisterSourceSuccess() throws Exception {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
......@@ -451,6 +473,8 @@ public class SourceApiV3ResourceTest {
any(File.class),
any(Namespace.class));
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
RequestResult rr = new RequestResult()
......@@ -928,7 +952,7 @@ public class SourceApiV3ResourceTest {
}
@Test
public void testUpdateSourceWithUrl() throws IOException {
public void testUpdateSourceWithUrl() throws IOException, ClassNotFoundException {
Configurator.setRootLevel(Level.DEBUG);
String filePackageUrl = "file://" + JAR_FILE_PATH;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册