未验证 提交 22ec6c06 编写于 作者: S Simon 提交者: GitHub

[Feature][jsonsplit]Refactor process lineage (#5034)

* Modify Project and ProjectUser Mapper

* Modify Project and ProjectUser Mapper

* project_code is bigint(20)

* modify ERROR name

* modify saveProcessDefine, remove the duplicate code with createTaskAndRelation

* modify import/export processdefinition, add genProcessData

* fix ut and bug

* code style

* repalce project_id with code

* conflicts solve

* conflicts solve

* conflicts solve

* bugfix

* modify listResources mothod and remove getResourceIds mothod

* 1

* conflicts solve

* modify listResources mothod and remove getResourceIds mothod

* modify listResources mothod and remove getResourceIds mothod

* replace processDefinitionVersion with processDefinitionLog

* codestyle

* codestyle

* add mapper module ut

* codestyle

* fix ProcessInstanceMapperTest

* codestyle

* conflicts solve

* conflicts solve

* conflicts solve

* conflicts solve

* conflicts solve

* fix ProcessInstanceMapperTest

* fix ProjectMapperTest/ProjectUserMapperTest/ScheduleMapperTest

* fix ProjectMapperTest/ProjectUserMapperTest/ScheduleMapperTest

* fix TaskInstanceMapperTest

* add TaskDefinitionLogMapperTest/TaskDefinitionMapperTest and bugfix

* codestyle

* codestyle

* Refactor process lineage

* Refactor process lineage

* codestyle

* codestyle

* Refactor process lineage

* Refactor process lineage
上级 a574d2e3
......@@ -20,9 +20,11 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
......@@ -45,6 +47,9 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
@Autowired
private WorkFlowLineageMapper workFlowLineageMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProjectMapper projectMapper;
......@@ -58,56 +63,71 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
return result;
}
private void getWorkFlowRelationRecursion(Set<Integer> ids, List<WorkFlowRelation> workFlowRelations, Set<Integer> sourceIds) {
for (int id : ids) {
sourceIds.addAll(ids);
List<WorkFlowRelation> workFlowRelationsTmp = workFlowLineageMapper.querySourceTarget(id);
if (workFlowRelationsTmp != null && !workFlowRelationsTmp.isEmpty()) {
Set<Integer> idsTmp = new HashSet<>();
for (WorkFlowRelation workFlowRelation : workFlowRelationsTmp) {
if (!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())) {
idsTmp.add(workFlowRelation.getTargetWorkFlowId());
private void getRelation(Map<Integer, WorkFlowLineage> workFlowLineageMap,
Set<WorkFlowRelation> workFlowRelations,
ProcessLineage processLineage) {
List<ProcessLineage> relations = workFlowLineageMapper.queryCodeRelation(
processLineage.getPostTaskCode(), processLineage.getPostTaskVersion()
, processLineage.getProcessDefinitionCode(), processLineage.getProjectCode());
for (ProcessLineage relation : relations) {
if (relation.getProcessDefinitionCode() != null) {
relation.setPreTaskCode(processLineage.getPostTaskCode());
relation.setPreTaskVersion(processLineage.getPostTaskVersion());
WorkFlowLineage pre = workFlowLineageMapper
.queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(), processLineage.getProjectCode());
// sourceWorkFlowId = ""
if (!workFlowLineageMap.containsKey(pre.getWorkFlowId())) {
workFlowLineageMap.put(pre.getWorkFlowId(), pre);
}
WorkFlowLineage post = workFlowLineageMapper
.queryWorkFlowLineageByCode(relation.getProcessDefinitionCode(), relation.getProjectCode());
if (workFlowLineageMap.containsKey(post.getWorkFlowId())) {
WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId());
String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId();
if (sourceWorkFlowId.equals("")) {
workFlowLineage.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId()));
} else {
workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + pre.getWorkFlowId());
}
} else {
post.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId()));
workFlowLineageMap.put(post.getWorkFlowId(), post);
}
WorkFlowRelation workFlowRelation = new WorkFlowRelation();
workFlowRelation.setSourceWorkFlowId(pre.getWorkFlowId());
workFlowRelation.setTargetWorkFlowId(post.getWorkFlowId());
if (workFlowRelations.contains(workFlowRelation)) {
continue;
}
workFlowRelations.addAll(workFlowRelationsTmp);
getWorkFlowRelationRecursion(idsTmp, workFlowRelations, sourceIds);
workFlowRelations.add(workFlowRelation);
getRelation(workFlowLineageMap, workFlowRelations, relation);
}
}
}
@Override
public Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids, int projectId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.selectById(projectId);
List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryByIds(ids, project.getCode());
Map<String, Object> workFlowLists = new HashMap<>();
Set<Integer> idsV = new HashSet<>();
if (ids == null || ids.isEmpty()) {
for (WorkFlowLineage workFlowLineage : workFlowLineageList) {
idsV.add(workFlowLineage.getWorkFlowId());
}
} else {
idsV = ids;
}
List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
Set<Integer> sourceIds = new HashSet<>();
getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds);
Set<Integer> idSet = new HashSet<>();
//If the incoming parameter is not empty, you need to add downstream workflow detail attributes
if (ids != null && !ids.isEmpty()) {
for (WorkFlowRelation workFlowRelation : workFlowRelations) {
idSet.add(workFlowRelation.getTargetWorkFlowId());
}
for (int id : ids) {
idSet.remove(id);
}
if (!idSet.isEmpty()) {
workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet, project.getCode()));
}
List<ProcessLineage> processLineages = workFlowLineageMapper.queryRelationByIds(ids, project.getCode());
Map<Integer, WorkFlowLineage> workFlowLineages = new HashMap<>();
Set<WorkFlowRelation> workFlowRelations = new HashSet<>();
for (ProcessLineage processLineage : processLineages) {
getRelation(workFlowLineages, workFlowRelations, processLineage);
}
workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineageList);
Map<String, Object> workFlowLists = new HashMap<>();
workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages.values());
workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations);
result.put(Constants.DATA_LIST, workFlowLists);
putMsg(result, Status.SUCCESS);
......
......@@ -66,8 +66,6 @@ public class WorkFlowLineageServiceTest {
ids.add(1);
ids.add(2);
when(workFlowLineageMapper.queryByIds(ids, 1L)).thenReturn(getWorkFlowLineages());
when(workFlowLineageMapper.querySourceTarget(1)).thenReturn(getWorkFlowRelation());
Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByIds(ids,1);
Map<String, Object> workFlowLists = (Map<String, Object>)result.get(Constants.DATA_LIST);
List<WorkFlowLineage> workFlowLineages = (List<WorkFlowLineage>)workFlowLists.get("workFlowList");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
/**
* Process lineage
*/
public class ProcessLineage {
/**
* project code
*/
private Long projectCode;
/**
* post task code
*/
private Long postTaskCode;
/**
* post task version
*/
private int postTaskVersion;
/**
* pre task code
*/
private Long preTaskCode;
/**
* pre task version
*/
private int preTaskVersion;
/**
* process definition code
*/
private Long processDefinitionCode;
/**
* process definition version
*/
private int processDefinitionVersion;
public Long getProjectCode() {
return projectCode;
}
public void setProjectCode(Long projectCode) {
this.projectCode = projectCode;
}
public Long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(Long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
public void setPostTaskCode(Long postTaskCode) {
this.postTaskCode = postTaskCode;
}
public Long getPreTaskCode() {
return preTaskCode;
}
public void setPreTaskCode(Long preTaskCode) {
this.preTaskCode = preTaskCode;
}
public int getPreTaskVersion() {
return preTaskVersion;
}
public void setPreTaskVersion(int preTaskVersion) {
this.preTaskVersion = preTaskVersion;
}
public int getPostTaskVersion() {
return postTaskVersion;
}
public void setPostTaskVersion(int postTaskVersion) {
this.postTaskVersion = postTaskVersion;
}
public long getPostTaskCode() {
return postTaskCode;
}
public void setPostTaskCode(long postTaskCode) {
this.postTaskCode = postTaskCode;
}
}
......@@ -17,21 +17,14 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* process task relation
......
......@@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.dao.entity;
import java.util.Objects;
public class WorkFlowRelation {
private int sourceWorkFlowId;
private int targetWorkFlowId;
......@@ -35,4 +37,16 @@ public class WorkFlowRelation {
public void setTargetWorkFlowId(int targetWorkFlowId) {
this.targetWorkFlowId = targetWorkFlowId;
}
@Override
public boolean equals(Object obj) {
return obj instanceof WorkFlowRelation
&& this.sourceWorkFlowId == ((WorkFlowRelation) obj).getSourceWorkFlowId()
&& this.targetWorkFlowId == ((WorkFlowRelation) obj).getTargetWorkFlowId();
}
@Override
public int hashCode() {
return Objects.hash(sourceWorkFlowId, targetWorkFlowId);
}
}
......@@ -16,9 +16,11 @@
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Set;
......@@ -26,24 +28,41 @@ public interface WorkFlowLineageMapper {
/**
* queryByName
*
* @param searchVal searchVal
* @param projectCode projectCode
* @return WorkFlowLineage list
* @return WorkFlowLineage list
*/
List<WorkFlowLineage> queryByName(@Param("searchVal") String searchVal, @Param("projectCode") Long projectCode);
/**
* queryByIds
* queryCodeRelation
*
* @param taskCode taskCode
* @param taskVersion taskVersion
* @param processDefinitionCode processDefinitionCode
* @return ProcessLineage
*/
List<ProcessLineage> queryCodeRelation(
@Param("taskCode") Long taskCode, @Param("taskVersion") int taskVersion,
@Param("processDefinitionCode") Long processDefinitionCode, @Param("projectCode") Long projectCode);
/**
* queryRelationByIds
*
* @param ids ids
* @param projectCode projectCode
* @return WorkFlowLineage list
* @return ProcessLineage
*/
List<WorkFlowLineage> queryByIds(@Param("ids") Set<Integer> ids, @Param("projectCode") Long projectCode);
List<ProcessLineage> queryRelationByIds(@Param("ids") Set<Integer> ids, @Param("projectCode") Long projectCode);
/**
* query SourceTarget
* @param id id
* @return WorkFlowRelation list
* queryWorkFlowLineageByCode
*
* @param processDefinitionCode processDefinitioncode
* @param projectCode projectCode
* @return WorkFlowLineage
*/
List<WorkFlowRelation> querySourceTarget(@Param("id") int id);
WorkFlowLineage queryWorkFlowLineageByCode(@Param("processDefinitionCode") Long processDefinitionCode, @Param("projectCode") Long projectCode);
}
......@@ -18,86 +18,65 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper">
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage">
select tepd.id as work_flow_id,tepd.name as work_flow_name
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.id = tes.process_definition_id
where tepd.project_code = #{projectCode}
<if test="searchVal != null and searchVal != ''">
and tepd.name like concat('%', #{searchVal}, '%')
and tepd.name like concat('%', #{searchVal}, '%')
</if>
</select>
<select id="queryByIds" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage" databaseId="mysql">
select tepd.id as work_flow_id,tepd.name as work_flow_name,
(case when json_extract(tepd.process_definition_json, '$**.dependItemList') is not null then 1 else 0 end) as is_depend_work_flow,
json_extract(tepd.process_definition_json, '$**.definitionId') as source_work_flow_id,
tepd.release_state as work_flow_publish_status,
tes.start_time as schedule_start_time,
tes.end_time as schedule_end_time,
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.id = tes.process_definition_id
where tepd.project_code = #{projectCode}
<select id="queryRelationByIds" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
select ptr.project_code,
ptr.post_task_code,
ptr.post_task_version,
ptr.pre_task_code,
ptr.pre_task_version,
ptr.process_definition_code,
ptr.process_definition_version
from t_ds_process_definition pd
join t_ds_process_task_relation ptr on pd.code = ptr.process_definition_code and pd.version =
ptr.process_definition_version
where pd.project_code = #{projectCode}
<if test="ids != null and ids.size()>0">
and tepd.id in
and pd.id in
<foreach collection="ids" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>
<select id="queryByIds" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage" databaseId="pg">
select a.work_flow_id,
a.work_flow_name,
a.is_depend_work_flow,
array_agg(a.source_id) as source_id,
a.work_flow_publish_status,
a.schedule_start_time,
a.schedule_end_time,
a.crontab,
a.schedule_publish_status
from (
select tepd.id as work_flow_id,tepd.name as work_flow_name,
case when tepd.process_definition_json::json#>'{tasks,1,dependence}' is not null then 1 else 0 end as is_depend_work_flow,
(json_array_elements(tepd.process_definition_json::json#>'{tasks}')#>>'{dependence,dependTaskList,0,dependItemList,0,definitionId}') as source_id,
tepd.release_state as work_flow_publish_status,
tes.start_time as schedule_start_time,
tes.end_time as schedule_end_time,
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.id = tes.process_definition_id
where tepd.project_code = #{projectCode}
<if test="ids != null and ids.size()>0">
and tepd.id in
<foreach collection="ids" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
) a
where (a.is_depend_work_flow = 1 and source_id is not null) or (a.is_depend_work_flow = 0)
group by a.work_flow_id,a.work_flow_name,a.is_depend_work_flow,a.work_flow_publish_status,a.schedule_start_time,
a.schedule_end_time,a.crontab,a.schedule_publish_status
</select>
<select id="queryCodeRelation" resultType="org.apache.dolphinscheduler.dao.entity.ProcessLineage">
select project_code
post_task_code,
post_task_version,
pre_task_code,
pre_task_version,
process_definition_code,
process_definition_version
from t_ds_process_task_relation ptr
where ptr.pre_task_code=#{taskCode}
and ptr.pre_task_version=#{taskVersion}
and ptr.process_definition_code!=#{processDefinitionCode}
and ptr.project_code =#{projectCode}
<select id="querySourceTarget" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelation" databaseId="mysql">
select id as target_work_flow_id,#{id} as source_work_flow_id
from t_ds_process_definition t
where json_extract(t.process_definition_json, '$**.dependItemList') is not null
and find_in_set(#{id}, replace(replace(replace(json_extract(t.process_definition_json, '$**.definitionId'), '[', ''),']', ''), ' ', '')) > 0
</select>
<select id="querySourceTarget" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowRelation" databaseId="pg">
select a.work_flow_id as target_work_flow_id,
a.source_id as source_work_flow_id
from (
select tepd.id as work_flow_id,
(json_array_elements(tepd.process_definition_json::json#>'{tasks}')#>>'{dependence,dependTaskList,0,dependItemList,0,definitionId}') as source_id
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.id = tes.process_definition_id
where tepd.project_id = 1) a
where source_id = #{id}::text;
<select id="queryWorkFlowLineageByCode" resultType="org.apache.dolphinscheduler.dao.entity.WorkFlowLineage"
databaseId="mysql">
select tepd.id as work_flow_id,tepd.name as work_flow_name,
"" as source_work_flow_id,
tepd.release_state as work_flow_publish_status,
tes.start_time as schedule_start_time,
tes.end_time as schedule_end_time,
tes.crontab as crontab,
tes.release_state as schedule_publish_status
from t_ds_process_definition tepd
left join t_ds_schedules tes on tepd.id = tes.process_definition_id
where tepd.project_code = #{projectCode} and tepd.code = #{processDefinitionCode}
</select>
</mapper>
......@@ -44,19 +44,4 @@ public class WorkFlowLineageMapperTest {
List<WorkFlowLineage> workFlowLineages = workFlowLineageMapper.queryByName("test",1L);
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
@Test
public void testQueryByIds() {
Set<Integer> ids = new HashSet<>();
ids.add(1);
List<WorkFlowLineage> workFlowLineages = workFlowLineageMapper.queryByIds(ids,1L);
Assert.assertNotEquals(workFlowLineages.size(), 0);
}
@Test
public void testQuerySourceTarget() {
List<WorkFlowRelation> workFlowRelations = workFlowLineageMapper.querySourceTarget(1);
Assert.assertNotEquals(workFlowRelations.size(), 0);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册