提交 cff8cc69 编写于 作者: L lgcareer 提交者: qiaozhanwei

[fix #1828] when the executor of process instance is not the owner of udf...

[fix #1828] when the executor of process instance is not the owner of udf resouce, the path of the read resource file is incorrect (#1847)

* fix issue 1828:get the udf resource path error when create udf function

* update grantResources

* first verify whether udf resource is bound by udf function

* update grantResources

* update testListAuthorizedUdfFunc

* update getUserInfo in order to run success
上级 9f597281
......@@ -180,9 +180,10 @@ public enum Status {
RESOURCE_SIZE_EXCEED_LIMIT(20007, "upload resource file size exceeds limit"),
RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified"),
UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar"),
HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail"),
RESOURCE_FILE_EXIST(20010, "resource file {0} already exists in hdfs,please delete it or change name!"),
RESOURCE_FILE_NOT_EXIST(20011, "resource file {0} not exists in hdfs!"),
HDFS_COPY_FAIL(20010, "hdfs copy {0} -> {1} fail"),
RESOURCE_FILE_EXIST(20011, "resource file {0} already exists in hdfs,please delete it or change name!"),
RESOURCE_FILE_NOT_EXIST(20012, "resource file {0} not exists in hdfs!"),
UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}"),
......
......@@ -16,12 +16,16 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.collections.BeanMap;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
......@@ -29,10 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.collections.BeanMap;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -277,15 +277,9 @@ public class ResourcesService extends BaseService {
String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode();
// get file hdfs path
// delete hdfs file by type
String originHdfsFileName = "";
String destHdfsFileName = "";
if (resource.getType().equals(ResourceType.FILE)) {
originHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, originResourceName);
destHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, name);
} else if (resource.getType().equals(ResourceType.UDF)) {
originHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, originResourceName);
destHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, name);
}
String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originResourceName);
String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,name);
try {
if (HadoopUtils.getInstance().exists(originHdfsFileName)) {
logger.info("hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName);
......@@ -354,15 +348,8 @@ public class ResourcesService extends BaseService {
// save file to hdfs, and delete original file
String hdfsFilename = "";
String resourcePath = "";
if (type.equals(ResourceType.FILE)) {
hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name);
resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
} else if (type.equals(ResourceType.UDF)) {
hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name);
resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode);
}
String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,name);
String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode);
try {
// if tenant dir not exists
if (!HadoopUtils.getInstance().exists(resourcePath)) {
......@@ -429,12 +416,19 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
//if resource type is UDF,need check whether it is bound by UDF functon
if (resource.getType() == (ResourceType.UDF)) {
List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(new int[]{resourceId});
if (CollectionUtils.isNotEmpty(udfFuncs)) {
logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
return result;
}
}
String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
String hdfsFilename = "";
String tenantCode = userMapper.queryTenantCodeByUserId(resource.getUserId()).getTenantCode();
// delete hdfs file by type
hdfsFilename = getHdfsFileName(resource, tenantCode, hdfsFilename);
String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getAlias());
//delete data in database
resourcesMapper.deleteById(resourceId);
......@@ -466,7 +460,7 @@ public class ResourcesService extends BaseService {
String tenantCode = tenant.getTenantCode();
try {
String hdfsFilename = getHdfsFileName(type,tenantCode,name);
String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,name);
if(HadoopUtils.getInstance().exists(hdfsFilename)){
logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, name,hdfsFilename);
putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
......@@ -525,7 +519,7 @@ public class ResourcesService extends BaseService {
User user = userMapper.queryDetailsById(resource.getUserId());
String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode();
// hdfs path
String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias());
String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getAlias());
logger.info("resource hdfs path is {} ", hdfsFileName);
try {
if(HadoopUtils.getInstance().exists(hdfsFileName)){
......@@ -684,8 +678,8 @@ public class ResourcesService extends BaseService {
return result;
}
// get file hdfs path
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName);
// get resource file hdfs path
hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName);
String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
logger.info("resource hdfs path is {} ", hdfsFileName);
......@@ -732,8 +726,7 @@ public class ResourcesService extends BaseService {
User user = userMapper.queryDetailsById(resource.getUserId());
String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode();
String hdfsFileName = "";
hdfsFileName = getHdfsFileName(resource, tenantCode, hdfsFileName);
String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getAlias());
String localFileName = FileUtils.getDownloadFilename(resource.getAlias());
logger.info("resource hdfs path is {} ", hdfsFileName);
......@@ -848,40 +841,6 @@ public class ResourcesService extends BaseService {
return result;
}
/**
* get hdfs file name
*
* @param resource resource
* @param tenantCode tenant code
* @param hdfsFileName hdfs file name
* @return hdfs file name
*/
private String getHdfsFileName(Resource resource, String tenantCode, String hdfsFileName) {
if (resource.getType().equals(ResourceType.FILE)) {
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias());
} else if (resource.getType().equals(ResourceType.UDF)) {
hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, resource.getAlias());
}
return hdfsFileName;
}
/**
* get hdfs file name
*
* @param resourceType resource type
* @param tenantCode tenant code
* @param hdfsFileName hdfs file name
* @return hdfs file name
*/
private String getHdfsFileName(ResourceType resourceType, String tenantCode, String hdfsFileName) {
if (resourceType.equals(ResourceType.FILE)) {
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, hdfsFileName);
} else if (resourceType.equals(ResourceType.UDF)) {
hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, hdfsFileName);
}
return hdfsFileName;
}
/**
* get authorized resource list
*
......
......@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
......@@ -39,6 +40,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
/**
* user service
......@@ -72,6 +74,9 @@ public class UsersService extends BaseService {
@Autowired
private AlertGroupMapper alertGroupMapper;
@Autowired
private UdfFuncMapper udfFuncMapper;
/**
* create user, only system admin have permission
......@@ -413,15 +418,14 @@ public class UsersService extends BaseService {
return result;
}
/**
* grant resource
*
* @param loginUser login user
* @param userId user id
* @param resourceIds resource id array
* @return grant result code
* @param loginUser login user
* @param userId user id
* @param resourceIds resource id array
* @return grant result code
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) {
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
......@@ -433,15 +437,28 @@ public class UsersService extends BaseService {
putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
String[] resourcesIdArr = resourceIds.split(",");
//if resource type is UDF,need check whether it is bound by UDF functon
Set<Integer> needAuthorizedIds = new HashSet<>();
if (StringUtils.isNotEmpty(resourceIds)) {
needAuthorizedIds = Arrays.stream(resourcesIdArr).map(t->Integer.parseInt(t)).collect(Collectors.toSet());
}
List<Resource> udfResourceList = resourceMapper.queryResourceList("", 0, ResourceType.UDF.ordinal());
Set<Integer> allUdfResIds = udfResourceList.stream().map(t -> t.getId()).collect(Collectors.toSet());
allUdfResIds.removeAll(needAuthorizedIds);
List<UdfFunc> udfFuncs = udfFuncMapper.listUdfByResourceId(ArrayUtils.toPrimitive(allUdfResIds.toArray(new Integer[allUdfResIds.size()])));
if (CollectionUtils.isNotEmpty(udfFuncs)) {
logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
putMsg(result, Status.UDF_RESOURCE_IS_BOUND, udfFuncs.get(0).getFuncName());
return result;
}
resourcesUserMapper.deleteResourceUser(userId, 0);
if (check(result, StringUtils.isEmpty(resourceIds), Status.SUCCESS)) {
return result;
}
String[] resourcesIdArr = resourceIds.split(",");
for (String resourceId : resourcesIdArr) {
Date now = new Date();
ResourcesUser resourcesUser = new ResourcesUser();
......
......@@ -18,10 +18,17 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -29,17 +36,30 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApiApplicationServer.class)
@Transactional
@Rollback(true)
public class ResourcesServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceTest.class);
@Autowired
private ResourcesService resourcesService;
@Autowired
private ResourceMapper resourceMapper;
@Autowired
private UdfFuncMapper udfFuncMapper;
@Autowired
private UserMapper userMapper;
@Test
public void querytResourceList(){
......@@ -50,4 +70,81 @@ public class ResourcesServiceTest {
Map<String, Object> map = resourcesService.queryResourceList(loginUser, ResourceType.FILE);
Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS));
}
@Test
public void testCreateResource(){
//create user
User loginUser = createGeneralUser("user1");
String resourceName = "udf-resource-1.jar";
String errorResourceName = "udf-resource-1";
MockMultipartFile udfResource = new MockMultipartFile(resourceName, resourceName, "multipart/form-data", "some content".getBytes());
Result result = resourcesService.createResource(loginUser, errorResourceName, "", ResourceType.UDF, udfResource);
Assert.assertEquals(result.getCode().intValue(),Status.RESOURCE_SUFFIX_FORBID_CHANGE.getCode());
List<Resource> resourceList = resourceMapper.queryResourceList(resourceName, loginUser.getId(), ResourceType.UDF.ordinal());
Assert.assertTrue(resourceList.size() == 0);
}
@Test
public void testDelete() throws Exception{
//create user
User loginUser = createGeneralUser("user1");
//create resource
Resource resource = createResource(loginUser,ResourceType.UDF,"udf-resource-1");
//create UDF function
UdfFunc udfFunc = createUdfFunc(loginUser, resource);
//delete resource
Result result = resourcesService.delete(loginUser, resource.getId());
Assert.assertEquals(result.getCode().intValue(),Status.UDF_RESOURCE_IS_BOUND.getCode());
}
/**
* create general user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* create resource by user
* @param user user
* @return Resource
*/
private Resource createResource(User user,ResourceType type,String name){
//insertOne
Resource resource = new Resource();
resource.setAlias(String.format("%s-%s",name,user.getUserName()));
resource.setType(type);
resource.setUserId(user.getId());
resourceMapper.insert(resource);
return resource;
}
/**
* insert one udf
* @return
*/
private UdfFunc createUdfFunc(User user, Resource resource){
UdfFunc udfFunc = new UdfFunc();
udfFunc.setUserId(user.getId());
udfFunc.setFuncName("dolphin_udf_func");
udfFunc.setClassName("org.apache.dolphinscheduler.test.mr");
udfFunc.setType(UdfType.HIVE);
udfFunc.setResourceId(resource.getId());
udfFunc.setResourceName(resource.getAlias());
udfFunc.setCreateTime(new Date());
udfFunc.setUpdateTime(new Date());
udfFuncMapper.insert(udfFunc);
return udfFunc;
}
}
\ No newline at end of file
......@@ -22,10 +22,14 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.junit.After;
......@@ -41,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
......@@ -68,6 +73,10 @@ public class UsersServiceTest {
private DataSourceUserMapper datasourceUserMapper;
@Mock
private AlertGroupMapper alertGroupMapper;
@Mock
private ResourceMapper resourceMapper;
@Mock
private UdfFuncMapper udfFuncMapper;
private String queueName ="UsersServiceTestQueue";
......@@ -203,7 +212,7 @@ public class UsersServiceTest {
logger.info(result.toString());
//success
when(userMapper.selectById(1)).thenReturn(getUser());
when(userMapper.selectById(1)).thenReturn(getAdminUser());
result = usersService.updateUser(1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
......@@ -218,8 +227,8 @@ public class UsersServiceTest {
User loginUser = new User();
try {
when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getUser());
when(userMapper.selectById(1)).thenReturn(getUser());
when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getAdminUser());
when(userMapper.selectById(1)).thenReturn(getAdminUser());
//no operate
Map<String, Object> result = usersService.deleteUserById(loginUser,3);
......@@ -247,7 +256,7 @@ public class UsersServiceTest {
@Test
public void testGrantProject(){
when(userMapper.selectById(1)).thenReturn(getUser());
when(userMapper.selectById(1)).thenReturn(getAdminUser());
User loginUser = new User();
String projectIds= "100000,120000";
Map<String, Object> result = usersService.grantProject(loginUser, 1, projectIds);
......@@ -268,20 +277,46 @@ public class UsersServiceTest {
public void testGrantResources(){
String resourceIds = "100000,120000";
when(userMapper.selectById(1)).thenReturn(getUser());
User loginUser = new User();
Map<String, Object> result = usersService.grantResources(loginUser, 1, resourceIds);
User needAuthorizedUser = new User();
needAuthorizedUser.setUserType(UserType.GENERAL_USER);
needAuthorizedUser.setId(100);
User generalUser = getGeneralUser();
User adminUser = getAdminUser();
when(userMapper.selectById(needAuthorizedUser.getId())).thenReturn(generalUser);
Map<String, Object> result = usersService.grantResources(generalUser, needAuthorizedUser.getId(), resourceIds);
logger.info(result.toString());
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS));
//user not exist
loginUser.setUserType(UserType.ADMIN_USER);
result = usersService.grantResources(loginUser, 2, resourceIds);
result = usersService.grantResources(adminUser, 2, resourceIds);
logger.info(result.toString());
Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS));
//success
result = usersService.grantResources(loginUser, 1, resourceIds);
result = usersService.grantResources(adminUser, needAuthorizedUser.getId(), resourceIds);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
List<Resource> udfResourceList = new ArrayList<Resource>() {{
add(createResource(getAdminUser(), ResourceType.UDF, 100000));
add(createResource(getAdminUser(), ResourceType.UDF, 120000));
}};
when(resourceMapper.queryResourceList("", 0, ResourceType.UDF.ordinal())).thenReturn(udfResourceList);
//mock udf function list
UdfFunc udfFunc = createUdfFunc(getAdminUser(), 100000);
List<UdfFunc> udfFuncs = new ArrayList<>();
udfFuncs.add(udfFunc);
when(udfFuncMapper.listUdfByResourceId(new int[]{100000})).thenReturn(udfFuncs);
//fail if udf resource is already bound by the udf function
result = usersService.grantResources(adminUser, needAuthorizedUser.getId(), "120000");
Assert.assertEquals(Status.UDF_RESOURCE_IS_BOUND, result.get(Constants.STATUS));
result = usersService.grantResources(adminUser, needAuthorizedUser.getId(), "100000");
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
......@@ -289,7 +324,7 @@ public class UsersServiceTest {
public void testGrantUDFFunction(){
String udfIds = "100000,120000";
when(userMapper.selectById(1)).thenReturn(getUser());
when(userMapper.selectById(1)).thenReturn(getAdminUser());
User loginUser = new User();
Map<String, Object> result = usersService.grantUDFFunction(loginUser, 1, udfIds);
logger.info(result.toString());
......@@ -309,7 +344,7 @@ public class UsersServiceTest {
public void testGrantDataSource(){
String datasourceIds = "100000,120000";
when(userMapper.selectById(1)).thenReturn(getUser());
when(userMapper.selectById(1)).thenReturn(getAdminUser());
User loginUser = new User();
Map<String, Object> result = usersService.grantDataSource(loginUser, 1, datasourceIds);
logger.info(result.toString());
......@@ -350,7 +385,7 @@ public class UsersServiceTest {
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
tempUser = (User) result.get(Constants.DATA_LIST);
//check userName
Assert.assertEquals("userTest0001",tempUser.getUserName());
Assert.assertEquals("general-user-0001",tempUser.getUserName());
}
......@@ -380,7 +415,7 @@ public class UsersServiceTest {
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
//exist user
when(userMapper.queryByUserNameAccurately("userTest0001")).thenReturn(getUser());
when(userMapper.queryByUserNameAccurately("userTest0001")).thenReturn(getAdminUser());
result = usersService.verifyUserName("userTest0001");
logger.info(result.toString());
Assert.assertEquals(Status.USER_NAME_EXIST.getMsg(), result.getMsg());
......@@ -430,8 +465,8 @@ public class UsersServiceTest {
User user = new User();
user.setUserType(UserType.GENERAL_USER);
user.setUserName("userTest0001");
user.setUserPassword("userTest0001");
user.setUserName("general-user-0001");
user.setUserPassword("general-user-0001");
return user;
}
......@@ -445,7 +480,7 @@ public class UsersServiceTest {
/**
* get user
*/
private User getUser(){
private User getAdminUser(){
User user = new User();
user.setUserType(UserType.ADMIN_USER);
......@@ -461,4 +496,37 @@ public class UsersServiceTest {
return tenant;
}
/**
* create resource by user
* @param user user
* @return Resource
*/
private Resource createResource(User user, ResourceType type,int id){
//insertOne
Resource resource = new Resource();
resource.setId(id);
resource.setType(type);
resource.setUserId(user.getId());
resourceMapper.insert(resource);
return resource;
}
/**
* create udf function
* @return udf function
*/
private UdfFunc createUdfFunc(User user, int resourceId){
UdfFunc udfFunc = new UdfFunc();
udfFunc.setUserId(user.getId());
udfFunc.setFuncName("dolphin_udf_func");
udfFunc.setClassName("org.apache.dolphinscheduler.test.mr");
udfFunc.setType(UdfType.HIVE);
udfFunc.setResourceId(resourceId);
udfFunc.setCreateTime(new Date());
udfFunc.setUpdateTime(new Date());
udfFuncMapper.insert(udfFunc);
return udfFunc;
}
}
\ No newline at end of file
......@@ -24,6 +24,7 @@ import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
......@@ -412,6 +413,22 @@ public class HadoopUtils implements Closeable {
}
}
/**
* hdfs resource dir
*
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
String hdfsDir = "";
if (resourceType.equals(ResourceType.FILE)) {
hdfsDir = getHdfsResDir(tenantCode);
} else if (resourceType.equals(ResourceType.UDF)) {
hdfsDir = getHdfsUdfDir(tenantCode);
}
return hdfsDir;
}
/**
* hdfs resource dir
*
......@@ -447,22 +464,42 @@ public class HadoopUtils implements Closeable {
* get absolute path and name for file on hdfs
*
* @param tenantCode tenant code
* @param filename file name
* @param fileName file name
* @return get absolute path and name for file on hdfs
*/
/**
* get hdfs file name
*
* @param resourceType resource type
* @param tenantCode tenant code
* @param fileName file name
* @return hdfs file name
*/
public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
}
/**
* get absolute path and name for resource file on hdfs
*
* @param tenantCode tenant code
* @param fileName file name
* @return get absolute path and name for file on hdfs
*/
public static String getHdfsFilename(String tenantCode, String filename) {
return String.format("%s/%s", getHdfsResDir(tenantCode), filename);
public static String getHdfsResourceFileName(String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
}
/**
* get absolute path and name for udf file on hdfs
*
* @param tenantCode tenant code
* @param filename file name
* @param fileName file name
* @return get absolute path and name for udf file on hdfs
*/
public static String getHdfsUdfFilename(String tenantCode, String filename) {
return String.format("%s/%s", getHdfsUdfDir(tenantCode), filename);
public static String getHdfsUdfFileName(String tenantCode, String fileName) {
return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
}
/**
......
......@@ -1549,10 +1549,11 @@ public class ProcessDao {
/**
* find tenant code by resource name
* @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
public String queryTenantCodeByResName(String resName){
return resourceMapper.queryTenantCodeByResourceName(resName);
public String queryTenantCodeByResName(String resName,ResourceType resourceType){
return resourceMapper.queryTenantCodeByResourceName(resName,resourceType.ordinal());
}
/**
......
......@@ -77,12 +77,20 @@ public interface ResourceMapper extends BaseMapper<Resource> {
List<Resource> queryResourceExceptUserId(@Param("userId") int userId);
/* *//**
* query tenant code by name
* @param resName resource name
* @return tenant code
*//*
String queryTenantCodeByResourceName(@Param("resName") String resName);*/
/**
* query tenant code by name
* @param resName resource name
* @param resType resource type
* @return tenant code
*/
String queryTenantCodeByResourceName(@Param("resName") String resName);
String queryTenantCodeByResourceName(@Param("resName") String resName,@Param("resType") int resType);
/**
* list authorized resource
......
......@@ -86,4 +86,12 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
*/
<T> List<UdfFunc> listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds);
/**
* list UDF by resource id
* @param resourceIds resource id array
* @return UDF function list
*/
List<UdfFunc> listUdfByResourceId(@Param("resourceIds") int[] resourceIds);
}
......@@ -70,7 +70,7 @@
<select id="queryTenantCodeByResourceName" resultType="java.lang.String">
select tenant_code
from t_ds_tenant t, t_ds_user u, t_ds_resources res
where t.id = u.tenant_id and u.id = res.user_id and res.type=0
where t.id = u.tenant_id and u.id = res.user_id and res.type=#{resType}
and res.alias= #{resName}
</select>
<select id="listAuthorizedResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
......
......@@ -87,4 +87,15 @@
</foreach>
</if>
</select>
<select id="listUdfByResourceId" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select *
from t_ds_udfs
where 1=1
<if test="resourceIds != null and resourceIds != ''">
and resource_id in
<foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>
\ No newline at end of file
......@@ -288,12 +288,12 @@ public class ResourceMapperTest {
resource.setUserId(user.getId());
resourceMapper.updateById(resource);
String resource1 = resourceMapper.queryTenantCodeByResourceName(
resource.getAlias()
String tenantCode = resourceMapper.queryTenantCodeByResourceName(
resource.getAlias(),resource.getType().ordinal()
);
Assert.assertEquals(resource1, "ut tenant code for resource");
Assert.assertEquals(tenantCode, "ut tenant code for resource");
resourceMapper.deleteById(resource.getId());
}
......
......@@ -20,8 +20,10 @@ package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.UDFUser;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
......@@ -55,6 +57,9 @@ public class UdfFuncMapperTest {
@Autowired
UDFUserMapper udfUserMapper;
@Autowired
ResourceMapper resourceMapper;
/**
* insert one udf
* @return UdfFunc
......@@ -91,6 +96,24 @@ public class UdfFuncMapperTest {
return udfFunc;
}
/**
* insert one udf
* @return
*/
private UdfFunc insertOne(User user,Resource resource){
UdfFunc udfFunc = new UdfFunc();
udfFunc.setUserId(user.getId());
udfFunc.setFuncName("dolphin_udf_func");
udfFunc.setClassName("org.apache.dolphinscheduler.test.mr");
udfFunc.setType(UdfType.HIVE);
udfFunc.setResourceId(resource.getId());
udfFunc.setResourceName(resource.getAlias());
udfFunc.setCreateTime(new Date());
udfFunc.setUpdateTime(new Date());
udfFuncMapper.insert(udfFunc);
return udfFunc;
}
/**
* insert one user
* @return User
......@@ -141,6 +164,20 @@ public class UdfFuncMapperTest {
return udfUser;
}
/**
* create resource by user
* @param user user
* @return Resource
*/
private Resource createUdfResource(User user,String resourceName){
Resource resource = new Resource();
resource.setAlias(resourceName);
resource.setType(ResourceType.UDF);
resource.setUserId(user.getId());
resourceMapper.insert(resource);
return resource;
}
/**
* create general user
* @return User
......@@ -319,4 +356,25 @@ public class UdfFuncMapperTest {
authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), ArrayUtils.toObject(udfFuncIds));
Assert.assertTrue(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(ArrayUtils.toObject(udfFuncIds))));
}
@Test
public void testListUdfByResourceId(){
//create general user
User generalUser = createGeneralUser("user1");
//create udf resource
Resource udfResource1 = createUdfResource(generalUser,"udf-resource-1");
//create udf function
UdfFunc udfFunc1 = insertOne(generalUser,udfResource1);
List<UdfFunc> udfFuncList = udfFuncMapper.listUdfByResourceId(new int[]{udfResource1.getId()});
Assert.assertTrue(udfFuncList.size() == 1);
//create udf resource
Resource udfResource2 = createUdfResource(generalUser,"udf-resource-2");
//create udf function
UdfFunc udfFunc2 = insertOne(generalUser,udfResource2);
udfFuncList = udfFuncMapper.listUdfByResourceId(new int[]{udfResource1.getId(),udfResource2.getId()});
Assert.assertTrue(udfFuncList.size() == 2);
}
}
\ No newline at end of file
......@@ -23,10 +23,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty;
......@@ -42,29 +40,18 @@ public class UDFUtils {
/**
* create function list
* @param udfFuncs udf functions
* @param tenantCode tenant code
* @param udfTenantCodeMap key is tenant code,value is udf function
* @param logger logger
* @return create function list
*/
public static List<String> createFuncs(List<UdfFunc> udfFuncs, String tenantCode,Logger logger){
// get hive udf jar path
String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode);
logger.info("hive udf jar path : {}" , hiveUdfJarPath);
// is the root directory of udf defined
if (StringUtils.isEmpty(hiveUdfJarPath)) {
logger.error("not define hive udf jar path");
throw new RuntimeException("hive udf jar base path not defined ");
}
Set<String> resources = getFuncResouces(udfFuncs);
public static List<String> createFuncs(Map<String,UdfFunc> udfTenantCodeMap, Logger logger){
List<String> funcList = new ArrayList<>();
// build jar sql
buildJarSql(funcList, resources, hiveUdfJarPath);
buildJarSql(funcList, udfTenantCodeMap);
// build temp function sql
buildTempFuncSql(funcList, udfFuncs);
buildTempFuncSql(funcList, udfTenantCodeMap.values().stream().collect(Collectors.toList()));
return funcList;
}
......@@ -72,18 +59,20 @@ public class UDFUtils {
/**
* build jar sql
* @param sqls sql list
* @param resources resource set
* @param uploadPath upload path
* @param udfTenantCodeMap key is tenant code,value is udf function
*/
private static void buildJarSql(List<String> sqls, Set<String> resources, String uploadPath) {
private static void buildJarSql(List<String> sqls, Map<String,UdfFunc> udfTenantCodeMap) {
String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
if (!uploadPath.startsWith("hdfs:")) {
uploadPath = defaultFS + uploadPath;
}
for (String resource : resources) {
sqls.add(String.format("add jar %s/%s", uploadPath, resource));
Set<Map.Entry<String, UdfFunc>> entries = udfTenantCodeMap.entrySet();
for (Map.Entry<String, UdfFunc> entry:udfTenantCodeMap.entrySet()){
String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getKey());
if (!uploadPath.startsWith("hdfs:")) {
uploadPath = defaultFS + uploadPath;
}
sqls.add(String.format("add jar %s/%s", uploadPath, entry.getValue().getResourceName()));
}
}
/**
......
......@@ -23,6 +23,7 @@ import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
......@@ -311,8 +312,8 @@ public class TaskScheduleThread implements Runnable {
if (!resFile.exists()) {
try {
// query the tenant code of the resource according to the name of the resource
String tentnCode = processDao.queryTenantCodeByResName(res);
String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode, res);
String tentnCode = processDao.queryTenantCodeByResName(res, ResourceType.FILE);
String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tentnCode, res);
logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + res, false, true);
......
......@@ -24,10 +24,7 @@ import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.EnumUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.job.db.BaseDataSource;
import org.apache.dolphinscheduler.common.job.db.DataSourceFactory;
import org.apache.dolphinscheduler.common.process.Property;
......@@ -176,7 +173,14 @@ public class SqlTask extends AbstractTask {
// check udf permission
checkUdfPermission(ArrayUtils.toObject(idsArray));
List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(idsArray);
createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
Map<String,UdfFunc> udfFuncMap = new HashMap<String,UdfFunc>();
for(UdfFunc udfFunc : udfFuncList) {
String tenantCode = processDao.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
udfFuncMap.put(tenantCode,udfFunc);
}
createFuncs = UDFUtils.createFuncs(udfFuncMap, logger);
}
// execute sql task
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册