提交 d7b768d1 编写于 作者: L lenboo

Merge remote-tracking branch 'upstream/1.3.1-release' into 131

......@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* SQL Task ExecutionContext
......@@ -38,9 +38,9 @@ public class SQLTaskExecutionContext implements Serializable {
*/
private String connectionParams;
/**
* udf function list
* udf function tenant code map
*/
private List<UdfFunc> udfFuncList;
private Map<UdfFunc,String> udfFuncTenantCodeMap;
public int getWarningGroupId() {
......@@ -51,12 +51,12 @@ public class SQLTaskExecutionContext implements Serializable {
this.warningGroupId = warningGroupId;
}
public List<UdfFunc> getUdfFuncList() {
return udfFuncList;
public Map<UdfFunc, String> getUdfFuncTenantCodeMap() {
return udfFuncTenantCodeMap;
}
public void setUdfFuncList(List<UdfFunc> udfFuncList) {
this.udfFuncList = udfFuncList;
public void setUdfFuncTenantCodeMap(Map<UdfFunc, String> udfFuncTenantCodeMap) {
this.udfFuncTenantCodeMap = udfFuncTenantCodeMap;
}
public String getConnectionParams() {
......@@ -72,7 +72,7 @@ public class SQLTaskExecutionContext implements Serializable {
return "SQLTaskExecutionContext{" +
"warningGroupId=" + warningGroupId +
", connectionParams='" + connectionParams + '\'' +
", udfFuncList=" + udfFuncList +
", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap +
'}';
}
}
......@@ -324,7 +324,13 @@ public class TaskPriorityQueueConsumer extends Thread{
}
List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
Map<UdfFunc,String> udfFuncMap = new HashMap<>();
for(UdfFunc udfFunc : udfFuncList) {
String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
udfFuncMap.put(udfFunc,tenantCode);
}
sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
}
}
......@@ -366,7 +372,7 @@ public class TaskPriorityQueueConsumer extends Thread{
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (projectResourceFiles != null) {
if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
// filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet());
......
......@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
......@@ -24,10 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.slf4j.Logger;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty;
......@@ -43,53 +42,44 @@ public class UDFUtils {
/**
* create function list
* @param udfFuncs udf functions
* @param tenantCode tenant code
* @param logger logger
* @param udfFuncTenantCodeMap key is udf function,value is tenant code
* @param logger logger
* @return create function list
*/
public static List<String> createFuncs(List<UdfFunc> udfFuncs, String tenantCode,Logger logger){
public static List<String> createFuncs(Map<UdfFunc,String> udfFuncTenantCodeMap, Logger logger){
if (CollectionUtils.isEmpty(udfFuncs)){
if (MapUtils.isEmpty(udfFuncTenantCodeMap)){
logger.info("can't find udf function resource");
return null;
}
// get hive udf jar path
String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode);
logger.info("hive udf jar path : {}" , hiveUdfJarPath);
// is the root directory of udf defined
if (StringUtils.isEmpty(hiveUdfJarPath)) {
logger.error("not define hive udf jar path");
throw new RuntimeException("hive udf jar base path not defined ");
}
Set<String> resources = getFuncResouces(udfFuncs);
List<String> funcList = new ArrayList<>();
// build jar sql
buildJarSql(funcList, resources, hiveUdfJarPath);
buildJarSql(funcList, udfFuncTenantCodeMap);
// build temp function sql
buildTempFuncSql(funcList, udfFuncs);
buildTempFuncSql(funcList, udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList()));
return funcList;
}
/**
* build jar sql
* @param sqls sql list
* @param resources resource set
* @param uploadPath upload path
* @param sqls sql list
* @param udfFuncTenantCodeMap key is udf function,value is tenant code
*/
private static void buildJarSql(List<String> sqls, Set<String> resources, String uploadPath) {
private static void buildJarSql(List<String> sqls, Map<UdfFunc,String> udfFuncTenantCodeMap) {
String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
if (!uploadPath.startsWith("hdfs:")) {
uploadPath = defaultFS + uploadPath;
}
for (String resource : resources) {
sqls.add(String.format("add jar %s/%s", uploadPath, resource));
Set<Map.Entry<UdfFunc,String>> entries = udfFuncTenantCodeMap.entrySet();
for (Map.Entry<UdfFunc,String> entry:entries){
String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue());
if (!uploadPath.startsWith("hdfs:")) {
uploadPath = defaultFS + uploadPath;
}
sqls.add(String.format("add jar %s%s", uploadPath, entry.getKey().getResourceName()));
}
}
/**
......@@ -106,20 +96,5 @@ public class UDFUtils {
}
}
/**
* get the resource names of all functions
* @param udfFuncs udf function list
* @return
*/
private static Set<String> getFuncResouces(List<UdfFunc> udfFuncs) {
Set<String> resources = new HashSet<>();
for (UdfFunc udfFunc : udfFuncs) {
resources.add(udfFunc.getResourceName());
}
return resources;
}
}
......@@ -132,8 +132,7 @@ public class SqlTask extends AbstractTask {
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
List<String> createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(),
taskExecutionContext.getTenantCode(),
List<String> createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(),
logger);
// execute sql task
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册