diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java new file mode 100644 index 0000000000000000000000000000000000000000..9ee19f10be5ed2e451e367fcd283067e80c511a8 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskChecker.java @@ -0,0 +1,34 @@ +package org.apache.dolphinscheduler.api.checker; + +import lombok.NonNull; +import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.List; + +public interface IsolationTaskChecker { + + void checkCanSubmitTaskIsolation(@NonNull User loginUser, + long projectCode, + ProcessInstance processInstance, + @NonNull List voList); + + void checkCanOnlineTaskIsolation(@NonNull User loginUser, + long projectCode, + @NonNull ProcessInstance processInstance, + @NonNull IsolationTask isolationTask); + + void checkCanListingTaskIsolation(@NonNull User loginUser, + long projectCode); + + void checkCanDeleteTaskIsolation(@NonNull User loginUser, + long projectCode, + long isolationId); + + void checkCanCancelTaskIsolation(@NonNull User loginUser, + long projectCode, + ProcessInstance processInstance, + @NonNull IsolationTask isolationTasks); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..99402f7fb6bc5e63017afd176dfffe099fd5a566 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/IsolationTaskCheckerImpl.java @@ -0,0 +1,206 @@ +package org.apache.dolphinscheduler.api.checker; + +import lombok.NonNull; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.WorkflowDAGService; +import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; +import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; +import org.apache.dolphinscheduler.dao.utils.DagHelper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_CANCEL; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_DELETE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_LIST; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_ONLINE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_SUBMIT; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT; +import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST; +import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST; + +@Component +public class IsolationTaskCheckerImpl implements IsolationTaskChecker { + + @Autowired + private ProjectService projectService; + + @Autowired + private ProcessDefinitionLogDao processDefinitionLogDao; + + @Autowired + private WorkflowDAGService workflowDAGService; + + @Autowired + private IsolationTaskDao isolationTaskDao; + + @Override + public void checkCanSubmitTaskIsolation(@NonNull User loginUser, + long projectCode, + ProcessInstance processInstance, + @NonNull List voList) { + Project project = projectService.queryByCode(loginUser, projectCode); + if (processInstance == null) { + throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST); + } + + ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao + .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()) + .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName())); + checkSubmitIsolateTaskAuth(loginUser, project, processDefinitionLog); + + DAG workflowDAG = workflowDAGService.getWorkflowDAG(processDefinitionLog); + checkWorkflowInstanceCanSubmitIsolateTask(processInstance.getName(), processInstance); + + checkTaskNodeCanSubmitIsolateTask(voList, workflowDAG); + } + + @Override + public void checkCanOnlineTaskIsolation(@NonNull User loginUser, + long projectCode, + ProcessInstance processInstance, + @NonNull IsolationTask isolationTask) { + Project project = projectService.queryByCode(loginUser, projectCode); + checkOnlineIsolationTaskAuth(loginUser, project, processInstance); + + if (IsolationTaskStatus.ONLINE.getCode() == isolationTask.getStatus()) { + throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE); + } + + String workflowInstanceName = isolationTask.getWorkflowInstanceName(); + checkWorkflowInstanceCanSubmitIsolateTask(workflowInstanceName, processInstance); + } + + @Override + public void checkCanListingTaskIsolation(@NonNull User loginUser, long projectCode) { + Project project = projectService.queryByCode(loginUser, projectCode); + + projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_LIST); + } + + @Override + public void checkCanDeleteTaskIsolation(@NonNull User loginUser, + long projectCode, + long isolationId) { + Project project = projectService.queryByCode(loginUser, projectCode); + projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_DELETE); + + IsolationTask isolationTask = isolationTaskDao.queryById(isolationId) + .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST)); + if (isolationTask.getStatus() != IsolationTaskStatus.OFFLINE.getCode()) { + throw new ServiceException(ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE); + } + } + + @Override + public void checkCanCancelTaskIsolation(@NonNull User loginUser, + long projectCode, + ProcessInstance processInstance, + @NonNull IsolationTask isolationTask) { + if (processInstance == null) { + throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST); + } + Project project = projectService.queryByCode(loginUser, projectCode); + checkCancelIsolationTaskAuth(loginUser, project, processInstance); + + if (isolationTask.getStatus() == IsolationTaskStatus.OFFLINE.getCode()) { + throw new ServiceException(ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL); + } + } + + private void checkWorkflowInstanceCanSubmitIsolateTask(@NonNull String workflowInstanceName, + ProcessInstance processInstance) { + // check if the given workflow instance can do isolate operation + // If the workflow instance is at ready_xx status, it cannot do isolate operation + if (processInstance == null) { + throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceName); + } + if (processInstance.getState().typeIsReady()) { + throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT, workflowInstanceName); + } + } + + private void checkTaskNodeCanSubmitIsolateTask(@NonNull List voList, + @NonNull DAG workflowDAG) { + for (IsolationTaskExcelParseVO vo : voList) { + String taskCodeStr = Long.toString(vo.getTaskCode()); + // check if the taskNode exist in DAG + if (!workflowDAG.containsNode(taskCodeStr)) { + throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST, vo.getTaskName()); + } + // check if the pre task exist an online isolation task + // if existed, we cannot create a new isolation task + Set allPreNodes = DagHelper.getAllPreNodes(taskCodeStr, workflowDAG); + List isolationTasks = isolationTaskDao.queryByTaskCodes(vo.getWorkflowInstanceId(), + allPreNodes.stream().map(Long::parseLong).collect(Collectors.toList())); + // todo: Do we need to support if the sub isolation task is offline? + if (CollectionUtils.isNotEmpty(isolationTasks)) { + throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK); + } + } + } + + private void checkSubmitIsolateTaskAuth(@NonNull User loginUser, + @NonNull Project project, + @NonNull ProcessDefinitionLog processDefinitionLog) { + if (processDefinitionLog.getProjectCode() != project.getCode()) { + throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT); + } + projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_SUBMIT); + } + + private void checkOnlineIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) { + if (processInstance == null) { + throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST); + } + ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao + .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()) + .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName())); + if (processDefinitionLog.getProjectCode() != project.getCode()) { + throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR); + } + + projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_ONLINE); + } + + private void checkCancelIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) { + if (processInstance == null) { + throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST); + } + ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao + .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()) + .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName())); + if (processDefinitionLog.getProjectCode() != project.getCode()) { + throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR); + } + projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_CANCEL); + + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java index 1805934628aa267646f8f9c8f8bcf6fb82275575..05e708d3b09e8b2c3ba2080a22ff5829d9a8882d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/checker/WorkflowInstanceCheckerImpl.java @@ -2,11 +2,11 @@ package org.apache.dolphinscheduler.api.checker; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProjectService; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -14,13 +14,12 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.enums.Status.CLEAN_TASK_INSTANCE_ERROR_WORKFLOW_INSTANCE_IS_RUNNING; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST; -import static org.apache.dolphinscheduler.api.enums.Status.PROJECT_NOT_EXIST; @Component public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker { @Autowired - private ProcessDefinitionDao processDefinitionDao; + private ProcessDefinitionLogDao processDefinitionLogDao; @Autowired private ProjectService projectService; @@ -30,16 +29,13 @@ public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker { if (processInstance == null) { throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST); } - // todo: check permission - ProcessDefinition processDefinition = - processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode()); - if (processDefinition == null) { - throw new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getProcessDefinitionCode()); - } - Project project = projectService.queryByCode(loginUser, processDefinition.getProjectCode()); - if (project == null) { - throw new ServiceException(PROJECT_NOT_EXIST); - } + ProcessDefinitionLog processDefinitionLog = + processDefinitionLogDao + .queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()) + .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, + processInstance.getProcessDefinitionCode())); + Project project = projectService.queryByCode(loginUser, processDefinitionLog.getProjectCode()); projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CLEAN_TASK_INSTANCE_STATE); // check state diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java index fba7677ab7f090d42a4c07f6f29e4013c5c39071..c8cdd2cb58548a30ebf7be3c12be74579e59d076 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/constants/ApiFuncIdentificationConstant.java @@ -96,6 +96,11 @@ public class ApiFuncIdentificationConstant { public static final String INSTANCE_BATCH_DELETE = "project:process-instance:batch-delete"; public static final String FORCED_SUCCESS = "project:task-instance:force-success"; public static final String CLEAN_TASK_INSTANCE_STATE = "project:task-instance:clean-state"; + public static final String ISOLATION_TASK_LIST = "project:isolation-task:list"; + public static final String ISOLATION_TASK_ONLINE = "project:isolation-task:online"; + public static final String ISOLATION_TASK_SUBMIT = "project:isolation-task:submit"; + public static final String ISOLATION_TASK_CANCEL = "project:isolation-task:cancel"; + public static final String ISOLATION_TASK_DELETE = "project:isolation-task:delete"; public static final String VIEW_LOG = "project:log:detail"; public static final String DOWNLOAD_LOG = "project:log:download-log"; public static final String PROJECT_OVERVIEW = "project:overview:view"; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java new file mode 100644 index 0000000000000000000000000000000000000000..f4b445456de3200d172719c0d6ce06264dc36657 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/IsolationTaskController.java @@ -0,0 +1,96 @@ +package org.apache.dolphinscheduler.api.controller; + +import io.swagger.annotations.Api; +import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest; +import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest; +import org.apache.dolphinscheduler.api.exceptions.ApiException; +import org.apache.dolphinscheduler.api.service.IsolationTaskService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; +import org.apache.dolphinscheduler.dao.entity.User; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; +import springfox.documentation.annotations.ApiIgnore; + +import javax.validation.Valid; + +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_LISTING_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR; + +@Api(tags = "ISOLATION_TASK_TAG") +@RestController +@RequestMapping("/projects/{projectCode}/isolation-task") +public class IsolationTaskController { + + @Autowired + private IsolationTaskService isolationTaskService; + + @PostMapping(value = "/submit") + @ResponseStatus(HttpStatus.OK) + @ApiException(ISOLATION_TASK_SUBMIT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result submitIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable long projectCode, + @RequestBody IsolationTaskSubmitRequest isolationTaskSubmitRequest) { + isolationTaskService.submitTaskIsolations(loginUser, projectCode, isolationTaskSubmitRequest); + return Result.success(null); + } + + @PutMapping(value = "/online/{id}") + @ResponseStatus(HttpStatus.OK) + @ApiException(ISOLATION_TASK_SUBMIT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result onlineIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable long projectCode, + @PathVariable(name = "id") long isolationId) { + isolationTaskService.onlineTaskIsolation(loginUser, projectCode, isolationId); + return Result.success(null); + } + + @PutMapping(value = "/cancel/{id}") + @ResponseStatus(HttpStatus.OK) + @ApiException(ISOLATION_TASK_SUBMIT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result cancelIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable long projectCode, + @PathVariable(name = "id") long isolationId) { + isolationTaskService.cancelTaskIsolation(loginUser, projectCode, isolationId); + return Result.success(null); + } + + @GetMapping("") + @ResponseStatus(HttpStatus.OK) + @ApiException(ISOLATION_TASK_LISTING_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable long projectCode, + @RequestBody @Valid IsolationTaskListingRequest request) { + return Result.success(isolationTaskService.listingTaskIsolation(loginUser, projectCode, request)); + } + + @DeleteMapping("/{id}") + @ResponseStatus(HttpStatus.OK) + @ApiException(ISOLATION_TASK_DELETE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result deleteIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable long projectCode, + @PathVariable long id) { + isolationTaskService.deleteTaskIsolation(loginUser, projectCode, id); + return Result.success(null); + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java index c9afc8fabd455daa3a9ac74c9e7d93c07953d05d..3fe81f7a712e30ae9c42fed8e5a21ba2ccc08b51 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/CleanTaskInstanceStateRequest.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import javax.validation.constraints.NotEmpty; import java.util.List; @Data @@ -11,5 +12,6 @@ import java.util.List; @AllArgsConstructor public class CleanTaskInstanceStateRequest { + @NotEmpty private List taskInstanceIds; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..a7e00677b52452eb65ef976442dffc07aa171e56 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskCancelRequest.java @@ -0,0 +1,17 @@ +package org.apache.dolphinscheduler.api.dto.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotEmpty; +import java.util.List; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class IsolationTaskCancelRequest { + + @NotEmpty + private List isolationIds; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..82104c3c9d08d74a9d750453b308ac63a0776f28 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskDeleteRequest.java @@ -0,0 +1,17 @@ +package org.apache.dolphinscheduler.api.dto.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotEmpty; +import java.util.List; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IsolationTaskDeleteRequest { + + @NotEmpty + private List isolationIds; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..fecd65e9c60cb1bb6b63ed66ad3472d1105089f3 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskListingRequest.java @@ -0,0 +1,24 @@ +package org.apache.dolphinscheduler.api.dto.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IsolationTaskListingRequest { + + private String workflowInstanceName; + + private String taskName; + + @NotNull(message = "page number cannot be null") + private Integer pageNo; + + @NotNull(message = "page size cannot be null") + private Integer pageSize; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..d89b681eafd21b5765544dff8a257ec5cfa01978 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/request/IsolationTaskSubmitRequest.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.api.dto.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO; + +import javax.validation.constraints.NotEmpty; +import java.util.List; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IsolationTaskSubmitRequest { + + @NotEmpty + private List isolationTaskExcelParseVOList; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 37f89b406f5093e003bbd6c5319bbb8798436349..602cb477dd6d524413b7a6c02e24ae45e8a14063 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -513,7 +513,44 @@ public enum Status { CONFIG_FILTER_EMPTY(1400007, "complement time is empty after filtering according to the configuration", "当前补数时间根据配置过滤后为空"), PROJECT_NAME_TOO_LONG_ERROR(1400008, "project name is too long error", "项目名称过长"), - ; + + ISOLATION_TASK_NOT_EXIST(1500000, "Isolation task not exist", "隔离任务不存在"), + ISOLATION_TASK_SUBMIT_ERROR(1500100, "Submit isolation task error", "提交隔离任务异常"), + ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT(1500101, + "Submit isolation task error, relate workflow instance [{0}] is not support", + "提交隔离任务异常, 关联的工作流实例:[{0}]暂不支持该操作"), + ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT(1500102, + "Submit isolation task error, relate workflow instance [{0}] is not belong to current project", + "提交隔离任务异常, 关联的工作流实例:[{0}]不属于当前项目"), + ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST(1500103, "Submit isolation task error, task: [{0}] is not exist", + "提交隔离任务异常, 任务不存在:[{0}]"), + ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK(1500104, + "Submit isolation task error, workflow instance: [{0}] exist an sub isolation task", + "提交隔离任务异常, 工作流实例已经存在:[{0}]子隔离任务"), + ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500105, + "Submit isolation task error, send request to master error", + "提交隔离任务异常,发送请求给Master异常"), + + ISOLATION_TASK_ONLINE_ERROR(1500200, "Online isolation task error", "上线隔离任务异常"), + ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE(1500201, + "Online isolation task error, the isolation task is already online", "上线隔离任务异常,该任务已处于隔离中"), + ISOLATION_TASK_ONLINE_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500202, + "Online isolation task error, send request to master error", "上线隔离任务异常,发送隔离请求给Master异常"), + ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500203, + "Online isolation task error, the process instance is not belongs to project", "上线隔离任务异常,当前工作流实例不属于目标任务"), + + ISOLATION_TASK_CANCEL_ERROR(1500300, "Cancel isolation task error", "取消隔离任务异常"), + ISOLATION_TASK_CANCEL_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500301, + "Cancel isolation task error, send request to master error", "取消隔离任务异常,发送隔离请求给Master异常"), + ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL(1500302, + "Cancel isolation task error, this task isolation is already been cancel", "取消隔离任务异常,该隔离已经下线"), + ISOLATION_TASK_CANCEL_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500303, + "Cancel isolation task error, the process instance is not belongs to project", "取消隔离任务异常,当前工作流实例不属于目标任务"), + ISOLATION_TASK_LISTING_ERROR(1500400, "Listing isolation task error", "查询隔离任务列表异常"), + ISOLATION_TASK_DELETE_ERROR(1500500, "Delete isolation task error", "删除隔离任务异常"), + ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE(1500501, "Delete isolation task error, the task status is not offline", + "删除隔离任务异常,该隔离任务尚未下线"), + ; private final int code; private final String enMsg; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java new file mode 100644 index 0000000000000000000000000000000000000000..ef3634cd520280d632418a5ada3c5ad779ce5585 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/remote/ApiServerRPCClient.java @@ -0,0 +1,35 @@ +package org.apache.dolphinscheduler.api.remote; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class ApiServerRPCClient { + + private final NettyRemotingClient client; + + private static final long DEFAULT_TIME_OUT_MILLS = 1_000L; + + public ApiServerRPCClient() { + client = new NettyRemotingClient(new NettyClientConfig()); + log.info("Success initialized ApiServerRPCClient..."); + } + + public void sendSyncCommand(@NonNull Host host, + @NonNull Command rpcCommand) throws RemotingException, InterruptedException { + sendSyncCommand(host, rpcCommand, DEFAULT_TIME_OUT_MILLS); + } + + public void sendSyncCommand(@NonNull Host host, + @NonNull Command rpcCommand, + long timeoutMills) throws RemotingException, InterruptedException { + client.sendSync(host, rpcCommand, timeoutMills); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java new file mode 100644 index 0000000000000000000000000000000000000000..7225d19007cb19ba5e77d5a8774d25175cd8d356 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/IsolationTaskService.java @@ -0,0 +1,32 @@ +package org.apache.dolphinscheduler.api.service; + +import lombok.NonNull; +import org.apache.dolphinscheduler.api.dto.request.IsolationTaskCancelRequest; +import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest; +import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; +import org.apache.dolphinscheduler.dao.entity.User; + +public interface IsolationTaskService { + + void submitTaskIsolations(@NonNull User loginUser, + long projectCode, + @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest); + + void onlineTaskIsolation(@NonNull User loginUser, + long projectCode, + long isolationTaskId); + + void cancelTaskIsolation(@NonNull User loginUser, + long projectCode, + long isolationId); + + PageInfo listingTaskIsolation(@NonNull User loginUser, + long projectCode, + @NonNull IsolationTaskListingRequest request); + + void deleteTaskIsolation(@NonNull User loginUser, + long projectCode, + long id); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 58baef26a6984ddea08a3b407c0db7e3c571f4f1..f3a9069a72d12d67e85beb13f14ed1180f513afe 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -18,16 +18,16 @@ package org.apache.dolphinscheduler.api.service; -import java.io.IOException; -import java.util.List; -import java.util.Map; - import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import java.io.IOException; +import java.util.List; +import java.util.Map; + /** * process instance service */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java index 716e2fba877da15a1a36c609c0148bc820e15b56..64fcf8fd994175dac7e993808649d30e4b26e4b9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import lombok.NonNull; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.User; @@ -44,6 +45,7 @@ public interface ProjectService { * @param projectCode project code * @return project detail information */ + @NonNull Project queryByCode(User loginUser, long projectCode); /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java new file mode 100644 index 0000000000000000000000000000000000000000..739c11a616485ca5c1853f806f9ea8055be2f954 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowDAGService.java @@ -0,0 +1,15 @@ +package org.apache.dolphinscheduler.api.service; + +import lombok.NonNull; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; + +public interface WorkflowDAGService { + + DAG getWorkflowDAG(@NonNull Long processDefinitionCode, + @NonNull Integer processDefinitionVersion); + + DAG getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..4f6ff908b1e3bdade352bc1b08dab3efe52572e0 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/IsolationTaskServiceImpl.java @@ -0,0 +1,246 @@ +package org.apache.dolphinscheduler.api.service.impl; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.dolphinscheduler.api.checker.IsolationTaskChecker; +import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest; +import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.remote.ApiServerRPCClient; +import org.apache.dolphinscheduler.api.service.IsolationTaskService; +import org.apache.dolphinscheduler.api.service.WorkflowDAGService; +import org.apache.dolphinscheduler.api.transformer.CommandTransformer; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.dao.utils.DagHelper; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.registry.RegistryClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST; +import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR; + +@Slf4j +@Service +public class IsolationTaskServiceImpl implements IsolationTaskService { + + @Autowired + private IsolationTaskDao isolationTaskDao; + + @Autowired + private IsolationTaskChecker isolationTaskChecker; + + @Autowired + private ProcessInstanceDao processInstanceDao; + + @Autowired + private CommandTransformer commandTransformer; + + @Autowired + private ApiServerRPCClient apiServerRPCClient; + + @Autowired + private RegistryClient registryClient; + + @Autowired + private ProcessService processService; + + @Autowired + private WorkflowDAGService workflowDAGService; + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Override + @Transactional + public void submitTaskIsolations(@NonNull User loginUser, + long projectCode, + @NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest) { + Map> workflow2VoMap = + isolationTaskSubmitRequest.getIsolationTaskExcelParseVOList() + .stream().collect( + HashMap::new, (map, vo) -> { + map.computeIfAbsent(vo.getWorkflowInstanceId(), k -> new ArrayList<>()).add(vo); + }, + Map::putAll); + + List>> needToOnlineIsolations = new ArrayList<>(); + List needToInsertIntoDB = new ArrayList<>(); + for (Map.Entry> entry : workflow2VoMap.entrySet()) { + Integer workflowInstanceId = entry.getKey(); + List vos = entry.getValue(); + ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId); + isolationTaskChecker.checkCanSubmitTaskIsolation(loginUser, projectCode, processInstance, vos); + + List isolationTasks = entry.getValue().stream().map(vo -> { + return IsolationTask.builder() + .workflowInstanceId(vo.getWorkflowInstanceId()) + .workflowInstanceName(vo.getWorkflowInstanceName()) + .taskName(vo.getTaskName()) + .taskCode(vo.getTaskCode()) + .status(IsolationTaskStatus.ONLINE.getCode()) + .build(); + }).collect(Collectors.toList()); + needToInsertIntoDB.addAll(isolationTasks); + needToOnlineIsolations.add(Pair.of(processInstance, isolationTasks)); + } + isolationTaskDao.batchInsert(needToInsertIntoDB); + // we split here to avoid rollback RPC request + try { + refreshIsolationTasks(); + } catch (Exception ex) { + throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR); + } + } + + @Override + @Transactional + public void onlineTaskIsolation(@NonNull User loginUser, long projectCode, long isolationTaskId) { + IsolationTask isolationTask = isolationTaskDao.queryById(isolationTaskId) + .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST)); + ProcessInstance processInstance = + processInstanceDao.queryProcessInstanceById(isolationTask.getWorkflowInstanceId()); + + isolationTaskChecker.checkCanOnlineTaskIsolation(loginUser, projectCode, processInstance, isolationTask); + isolationTaskDao.updateIsolationTaskStatus(isolationTaskId, IsolationTaskStatus.ONLINE); + try { + refreshIsolationTasks(); + } catch (Exception ex) { + throw new ServiceException(Status.ISOLATION_TASK_ONLINE_ERROR); + } + } + + @Override + @Transactional + public void cancelTaskIsolation(@NonNull User loginUser, + long projectCode, + long isolationId) { + IsolationTask isolationTask = isolationTaskDao.queryById(isolationId) + .orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST)); + + Integer workflowInstanceId = isolationTask.getWorkflowInstanceId(); + ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId); + isolationTaskChecker.checkCanCancelTaskIsolation(loginUser, projectCode, processInstance, isolationTask); + isolationTaskDao.updateIsolationTaskStatus(isolationTask.getId(), IsolationTaskStatus.OFFLINE); + insertRecoveryCommandIfNeeded(processInstance, isolationTask); + try { + refreshIsolationTasks(); + } catch (RemotingException | InterruptedException e) { + throw new ServiceException(Status.ISOLATION_TASK_CANCEL_ERROR); + } + } + + @Override + public PageInfo listingTaskIsolation(@NonNull User loginUser, + long projectCode, + @NonNull IsolationTaskListingRequest request) { + isolationTaskChecker.checkCanListingTaskIsolation(loginUser, projectCode); + + Integer pageNo = request.getPageNo(); + Integer pageSize = request.getPageSize(); + + IPage iPage = isolationTaskDao.pageQueryIsolationTask( + request.getWorkflowInstanceName(), + request.getTaskName(), + pageNo, + pageSize); + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotal((int) iPage.getTotal()); + pageInfo.setTotalList(iPage.getRecords()); + return pageInfo; + } + + @Override + public void deleteTaskIsolation(@NonNull User loginUser, long projectCode, long id) { + isolationTaskChecker.checkCanDeleteTaskIsolation(loginUser, projectCode, id); + int deleteNum = isolationTaskDao.deleteByIdAndStatus(id, IsolationTaskStatus.OFFLINE); + if (deleteNum <= 0) { + throw new ServiceException(ISOLATION_TASK_NOT_EXIST); + } + } + + private void refreshIsolationTasks() throws RemotingException, InterruptedException { + List masters = registryClient.getServerList(NodeType.MASTER); + if (CollectionUtils.isEmpty(masters)) { + return; + } + + org.apache.dolphinscheduler.remote.command.Command refreshIsolationRequest = + new RefreshIsolationTaskRequest().convert2Command(); + for (Server master : masters) { + try { + apiServerRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()), + refreshIsolationRequest); + } catch (RemotingException | InterruptedException e) { + log.error("Send RefreshIsolationTask request to master error, master: {}", master, e); + throw e; + } + } + } + + private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance, + @NonNull IsolationTask isolationTask) { + if (processInstance.getState() != ExecutionStatus.PAUSE_BY_ISOLATION) { + return; + } + int workflowInstanceId = processInstance.getId(); + // find the isolationTaskInstanceIds need to recovery + // find the sub node is in pause or kill + DAG workflowDAG = workflowDAGService.getWorkflowDAG( + processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + + List taskInstances = + taskInstanceDao.queryValidatedTaskInstanceByWorkflowInstanceId(workflowInstanceId); + Set onlineIsolationTaskCodes = + isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId, IsolationTaskStatus.ONLINE) + .stream() + .map(onlineIsolationTask -> String.valueOf(onlineIsolationTask.getTaskCode())) + .collect(Collectors.toSet()); + + List canRecoveryTaskInstances = taskInstances.stream() + .filter(taskInstance -> taskInstance.getState().typeIsIsolated()) + .filter(taskInstance -> !DagHelper.isChildOfAnyParentNodes(String.valueOf(taskInstance.getTaskCode()), + onlineIsolationTaskCodes, workflowDAG)) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(canRecoveryTaskInstances)) { + return; + } + // find if this taskInstance still exist pre isolationTasks + Command command = commandTransformer.transformToRecoveryFromTaskIsolationCommand(processInstance, + canRecoveryTaskInstances); + processService.createCommand(command); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java index 7f5bdf086bf7223e6343e6fa244a9dead014966a..3720956c453ef8a0ec566e61bcb32eedc7c0d7fc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.NonNull; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; @@ -168,7 +169,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic * @return project detail information */ @Override - public Project queryByCode(User loginUser, long projectCode) { + public @NonNull Project queryByCode(User loginUser, long projectCode) { Map result = new HashMap<>(); Project project = projectMapper.queryByCode(projectCode); hasProjectAndPerm(loginUser, project, result, PROJECT); @@ -204,10 +205,8 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic perm)) { // check read permission Project checkProject = projectMapper.queryByCode(projectCode); - putMsg(result, Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(), + throw new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(), Objects.nonNull(checkProject) ? project.getName() : projectCode); - } else { - putMsg(result, Status.SUCCESS); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index f6da23ac0f29af87a2ea7b79a0010dea9f0409bf..9d1dc166a2d606b2fab8ac1fe5ba4a25097cf7d6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import lombok.extern.slf4j.Slf4j; +import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; @@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -41,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -89,6 +92,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst @Autowired private TaskInstanceDao taskInstanceDao; + @Autowired + private ProcessInstanceDao processInstanceDao; + + @Autowired + private WorkflowInstanceChecker workflowInstanceChecker; + @Autowired private CommandTransformer commandTransformer; @@ -219,7 +228,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst List existTaskInstances = taskInstanceDao.queryTaskInstanceByIds(taskInstanceIds); TaskInstanceValidator.validateTaskInstanceAllExists(taskInstanceIds, existTaskInstances); - Map> processInstanceId2TaskInstanceIds = + Map> processInstanceId2TaskInstanceIdMap = existTaskInstances.stream().collect( HashMap::new, (map, taskInstance) -> { @@ -228,11 +237,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst }, Map::putAll); - List cleanStateCommands = commandTransformer.transformToCleanTaskInstanceStateCommands(loginUser, - processInstanceId2TaskInstanceIds); - // todo: use batch create and remove the transactional - cleanStateCommands.forEach(command -> processService.createCommand(command)); + for (Map.Entry> processInstanceId2TaskInstanceIds : processInstanceId2TaskInstanceIdMap + .entrySet()) { + Integer workflowInstanceId = processInstanceId2TaskInstanceIds.getKey(); + List needToCleanStateTaskInstanceIds = processInstanceId2TaskInstanceIds.getValue(); + ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId); + workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, workflowInstance); + Command command = commandTransformer.transformToCleanTaskInstanceStateCommand(workflowInstance, + needToCleanStateTaskInstanceIds); + processService.createCommand(command); + } } catch (ServiceException serviceException) { throw serviceException; } catch (Exception ex) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..f1ffb0f64dfebb0efa6375b74a2b71eb1ded64db --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDagServiceImpl.java @@ -0,0 +1,64 @@ +package org.apache.dolphinscheduler.api.service.impl; + +import lombok.NonNull; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.WorkflowDAGService; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; +import org.apache.dolphinscheduler.dao.utils.DagHelper; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.List; + +import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST; + +@Service +public class WorkflowDagServiceImpl implements WorkflowDAGService { + + @Autowired + private ProcessDefinitionLogDao processDefinitionLogDao; + + // todo: use dao + @Autowired + private ProcessService processService; + + @Override + public DAG getWorkflowDAG(@NonNull Long processDefinitionCode, + @NonNull Integer processDefinitionVersion) { + ProcessDefinitionLog processDefinitionLog = + processDefinitionLogDao.queryProcessDefinitionByCode(processDefinitionCode, processDefinitionVersion) + .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processDefinitionCode)); + return getWorkflowDAG(processDefinitionLog); + } + + @Override + public DAG getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog) { + List processTaskRelations = + processService.findRelationByCode(processDefinitionLog.getCode(), processDefinitionLog.getVersion()); + List taskDefinitionLogs = + processService.getTaskDefineLogListByRelation(processTaskRelations); + List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); + + ProcessDag processDag = DagHelper.generateFlowDag( + taskNodeList, + Collections.emptyList(), + Collections.emptyList(), + TaskDependType.TASK_POST); + if (processDag == null) { + throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY); + } + + return DagHelper.buildDagGraph(processDag); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java index 51865effaf15cd4c08fef878dfd1bf905377f533..986c2b99a60ad5975f1ecd6bc98a63ec2cebbc74 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformer.java @@ -2,17 +2,16 @@ package org.apache.dolphinscheduler.api.transformer; import lombok.NonNull; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import java.util.List; -import java.util.Map; public interface CommandTransformer { - List transformToCleanTaskInstanceStateCommands(@NonNull User loginUser, - @NonNull Map> workflowInstanceId2TaskInstanceIds); + Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance, + @NonNull List needCleanTaskInstanceIds); - Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser, - @NonNull Integer workflowInstanceId, - @NonNull List needCleanTaskInstances); + Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance, + @NonNull List canRecoveryTaskInstances); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java index 2a0e46033e19950ac34e305c061cce7a93f9e9ad..9d8aadff4388c25281edbafd6b3b812d1c32c2f9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/transformer/CommandTransformerImpl.java @@ -1,53 +1,59 @@ package org.apache.dolphinscheduler.api.transformer; import lombok.NonNull; -import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; -import org.springframework.beans.factory.annotation.Autowired; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.springframework.stereotype.Component; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS; @Component public class CommandTransformerImpl implements CommandTransformer { - @Autowired - private ProcessInstanceDao processInstanceDao; + @Override + public Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance, + @NonNull List needCleanTaskInstanceIds) { - @Autowired - private WorkflowInstanceChecker workflowInstanceChecker; + Command command = new Command(); + command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS); + command.setExecutorId(processInstance.getExecutorId()); + command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); + command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); + // we set the task post strategy, since we will go through the post task if the post task is not running + command.setTaskDependType(TaskDependType.TASK_POST); + command.setFailureStrategy(processInstance.getFailureStrategy()); + command.setWarningType(processInstance.getWarningType()); + command.setWarningGroupId(processInstance.getWarningGroupId()); + command.setProcessInstancePriority(processInstance.getProcessInstancePriority()); + command.setWorkerGroup(processInstance.getWorkerGroup()); + command.setEnvironmentCode(processInstance.getEnvironmentCode()); + command.setDryRun(processInstance.getDryRun()); - @Override - public List transformToCleanTaskInstanceStateCommands(@NonNull User loginUser, - @NonNull Map> workflowInstanceId2TaskInstanceIds) { - List commands = new ArrayList<>(workflowInstanceId2TaskInstanceIds.size()); - workflowInstanceId2TaskInstanceIds.forEach((workflowInstanceId, taskInstanceIds) -> { - Command command = transformToCleanTaskInstanceStateCommand(loginUser, workflowInstanceId, taskInstanceIds); - commands.add(command); - }); - return commands; + // todo:use pojo to represent CommandParam rather than map + Map commandParamMap = new HashMap<>(); + commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstanceIds)); + command.setCommandParam(JSONUtils.toJsonString(commandParamMap)); + command.setProcessInstanceId(processInstance.getId()); + return command; } @Override - public Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser, - @NonNull Integer workflowInstanceId, - @NonNull List needCleanTaskInstances) { - ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId); - workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, processInstance); - + public Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance, + @NonNull List canRecoveryIsolationTaskInstances) { Command command = new Command(); - command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS); + command.setCommandType(CommandType.RECOVERY_FROM_ISOLATION_TASKS); + command.setExecutorId(processInstance.getExecutorId()); command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); @@ -62,10 +68,22 @@ public class CommandTransformerImpl implements CommandTransformer { command.setDryRun(processInstance.getDryRun()); // todo:use pojo to represent CommandParam rather than map + List recoveryPausedIsolationIds = canRecoveryIsolationTaskInstances.stream() + .filter(taskInstance -> taskInstance.getState().typeIsPauseByIsolation()) + .map(TaskInstance::getId) + .collect(Collectors.toList()); + List recoveryKilledIsolationIds = canRecoveryIsolationTaskInstances.stream() + .filter(taskInstance -> taskInstance.getState().typeIsKilledByIsolation()) + .map(TaskInstance::getId) + .collect(Collectors.toList()); Map commandParamMap = new HashMap<>(); - commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstances)); + commandParamMap.put(CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS, + JSONUtils.toJsonString(recoveryPausedIsolationIds)); + commandParamMap.put(CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS, + JSONUtils.toJsonString(recoveryKilledIsolationIds)); command.setCommandParam(JSONUtils.toJsonString(commandParamMap)); command.setProcessInstanceId(processInstance.getId()); return command; } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java new file mode 100644 index 0000000000000000000000000000000000000000..be63400aed9cd9b6e52d83811a117035e1f34889 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/vo/IsolationTaskExcelParseVO.java @@ -0,0 +1,25 @@ +package org.apache.dolphinscheduler.api.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotNull; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class IsolationTaskExcelParseVO { + + @NotNull(message = "Workflow instance id cannot be null") + private Integer workflowInstanceId; + + @NotNull(message = "Workflow instance name cannot be null") + private String workflowInstanceName; + + @NotNull(message = "Task code cannot be null") + private Long taskCode; + + @NotNull(message = "Task name cannot be null") + private String taskName; +} diff --git a/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx b/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..f334cc6e5e78cc153a832bac3590fdf97da8d608 Binary files /dev/null and b/dolphinscheduler-api/src/main/resources/static/excel-en.xlsx differ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 0499d43c3085fb594416a58b90f1f48ad5bd2204..efffbc38429455e943b7669470c71b001b8448ab 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -338,6 +338,9 @@ public final class Constants { public static final String CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS = "CleanStateTaskInstanceIds"; + public static final String CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS = "RecoveryPausedIsolationTaskInstanceIds"; + public static final String CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS = "RecoveryKilledIsolationTaskInstanceIds"; + public static final String CMD_PARAM_START_PARAMS = "StartParams"; public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams"; @@ -859,5 +862,5 @@ public final class Constants { * file upload verify */ public static final String FILE_TYPE_RESTRICTED_LIST = "file.type.restricted.list"; - public static final String FILE_NAME_RESTRICTED_CONTENT= "file.name.restricted.content"; + public static final String FILE_NAME_RESTRICTED_CONTENT = "file.name.restricted.content"; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java index f9bc11fee3da52e359695a90d78cb1c17e37faeb..62d538a58ba150c1c18283b8149feb15296f6921 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java @@ -55,6 +55,8 @@ public enum CommandType { RECOVER_SERIAL_WAIT(11, "recover serial wait"), START_FROM_STATE_CLEAN_TASKS(12, "start from state clean tasks"), + RECOVERY_FROM_ISOLATION_TASKS(13, "recovery from isolation tasks"), + ; CommandType(int code, String descp) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java new file mode 100644 index 0000000000000000000000000000000000000000..dcc93cc64bc437303d3dba5681f8f7dcc345c72b --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/dto/IsolationTaskStatus.java @@ -0,0 +1,27 @@ +package org.apache.dolphinscheduler.dao.dto; + +public enum IsolationTaskStatus { + + ONLINE(0), + OFFLINE(1), + ; + + private final int code; + + IsolationTaskStatus(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static IsolationTaskStatus of(int code) { + for (IsolationTaskStatus value : IsolationTaskStatus.values()) { + if (value.getCode() == code) { + return value; + } + } + throw new IllegalArgumentException(String.format("Isolation task status code: %s is invalidated: ", code)); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java new file mode 100644 index 0000000000000000000000000000000000000000..8a852aace5d7a15f45235d874d3cd04f4c02a254 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/IsolationTask.java @@ -0,0 +1,37 @@ +package org.apache.dolphinscheduler.dao.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@TableName("t_ds_isolation_task") +public class IsolationTask { + + @TableId(value = "id", type = IdType.AUTO) + private Long id; + + private int workflowInstanceId; + + private String workflowInstanceName; + + private String taskName; + + private long taskCode; + + private int status; + + private Date createTime; + + private Date updateTime; + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..babb00c5bb007a09a803f7599ab97f466474d871 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.java @@ -0,0 +1,32 @@ +package org.apache.dolphinscheduler.dao.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import lombok.NonNull; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +public interface IsolationTaskMapper extends BaseMapper { + + IPage pageQuery(@Param("workflowInstanceName") String workflowInstanceName, + @Param("taskName") String taskName, + IPage page); + + List queryByTaskCodes(@NonNull @Param("workflowInstanceId") Integer workflowInstanceId, + @NonNull @Param("taskCodes") List taskCodes); + + void updateIsolationTaskStatus(@Param("id") long isolationTaskId, + @Param("status") int status); + + int deleteByIdAndStatus(@Param("id") long isolationTaskId, + @Param("status") int status); + + List queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId, + @Param("status") int status); + + void batchInsert(@Param("isolationTasks") List isolationTasks); + + List queryByStatus(@Param("status") int code); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java new file mode 100644 index 0000000000000000000000000000000000000000..de16a92be0def01af0dd9b546f2cebd13cef5d36 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IsolationTaskDao.java @@ -0,0 +1,39 @@ +package org.apache.dolphinscheduler.dao.repository; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import lombok.NonNull; +import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public interface IsolationTaskDao { + + /** + * Page query, pageNumber is start from 0. + */ + IPage pageQueryIsolationTask(String workflowInstanceName, + String taskName, + int pageNumber, + int pageSize); + + List queryByTaskCodes(Integer workflowInstanceId, List taskCodes); + + List queryByWorkflowInstanceId(Integer workflowInstanceId, IsolationTaskStatus isolationTaskStatus); + + Optional queryById(long isolationTaskId); + + List queryByIds(List isolationTaskIds); + + List queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus); + + int deleteByIdAndStatus(long id, IsolationTaskStatus status); + + void insert(IsolationTask isolationTaskDTO); + + void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus); + + void batchInsert(List isolationTasks); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java new file mode 100644 index 0000000000000000000000000000000000000000..a8eba8798956b2ed4b40aab2d34f04027f0dc958 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.dao.repository; + +import lombok.NonNull; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; + +import java.util.Optional; + +public interface ProcessDefinitionLogDao { + + Optional queryProcessDefinitionByCode(@NonNull Long processDefinitionCode, + @NonNull Integer processDefinitionVersion); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index f21851019145801961eb09a3a459bf9959f44ac1..e789101783f17e65da296dce50b78b3ccb7a193b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -38,4 +38,5 @@ public interface ProcessInstanceDao { * @param processInstance processInstance */ int upsertProcessInstance(ProcessInstance processInstance); + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java new file mode 100644 index 0000000000000000000000000000000000000000..6a606e3aefefa8d17a4187ac7fa4bfcf436ba896 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java @@ -0,0 +1,11 @@ +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; + +import java.util.List; + +public interface ProcessTaskRelationLogDao { + + List queryProcessTaskRelationLogByWorkflow(long workflowDefinitionCode, + int workflowDefinitionVersion); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java new file mode 100644 index 0000000000000000000000000000000000000000..5c8d55b005dfab003d9d08c51e7f02554f9d6a1f --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; + +import java.util.List; + +public interface TaskDefinitionLogDao { + + List queryTaskDefinitionByRelations(List processTaskRelations); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java index c5b75fb0d9bc79d2f160737d831373a0c6e59dec..3918ad8386141c7b8ed5bacb62082a8ca845cf0d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -10,6 +10,8 @@ public interface TaskInstanceDao { List queryTaskInstanceByIds(List taskInstanceId); + List queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId); + /** * Update the taskInstance, if update failed will throw exception. * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..effab3aa16ac7189801e5bf3162d8edf73b93185 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/IsolationTaskDaoImpl.java @@ -0,0 +1,86 @@ +package org.apache.dolphinscheduler.dao.repository.impl; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; +import org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +@Slf4j +@Repository +public class IsolationTaskDaoImpl implements IsolationTaskDao { + + @Autowired + private IsolationTaskMapper isolationTaskMapper; + + @Override + public IPage pageQueryIsolationTask(String workflowInstanceName, + String taskName, + int pageNumber, + int pageSize) { + Page page = new Page<>(pageNumber, pageSize); + return isolationTaskMapper.pageQuery(workflowInstanceName, taskName, page); + } + + @Override + public List queryByTaskCodes(Integer workflowInstanceId, List taskCodes) { + if (CollectionUtils.isEmpty(taskCodes)) { + return Collections.emptyList(); + } + return isolationTaskMapper.queryByTaskCodes(workflowInstanceId, taskCodes); + } + + @Override + public List queryByWorkflowInstanceId(Integer workflowInstanceId, + IsolationTaskStatus isolationTaskStatus) { + return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId, isolationTaskStatus.getCode()); + } + + @Override + public Optional queryById(long isolationTaskId) { + return Optional.ofNullable(isolationTaskMapper.selectById(isolationTaskId)); + } + + @Override + public List queryByIds(List isolationTaskIds) { + return isolationTaskMapper.selectBatchIds(isolationTaskIds); + } + + @Override + public List queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus) { + return isolationTaskMapper.queryByStatus(isolationTaskStatus.getCode()); + } + + @Override + public int deleteByIdAndStatus(long id, IsolationTaskStatus status) { + return isolationTaskMapper.deleteByIdAndStatus(id, status.getCode()); + } + + @Override + public void insert(IsolationTask isolationTask) { + isolationTaskMapper.insert(isolationTask); + } + + @Override + public void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus) { + isolationTaskMapper.updateIsolationTaskStatus(isolationTaskId, isolationTaskStatus.getCode()); + } + + @Override + public void batchInsert(List isolationTasks) { + if (CollectionUtils.isEmpty(isolationTasks)) { + return; + } + isolationTaskMapper.batchInsert(isolationTasks); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..1c77aea33feff01f6d676a1d1410b55c2d59a893 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java @@ -0,0 +1,25 @@ +package org.apache.dolphinscheduler.dao.repository.impl; + +import lombok.NonNull; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import java.util.Optional; + +@Repository +public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao { + + @Autowired + private ProcessDefinitionLogMapper processDefinitionLogMapper; + + @Override + public Optional queryProcessDefinitionByCode(@NonNull Long processDefinitionCode, + @NonNull Integer processDefinitionVersion) { + return Optional.ofNullable( + processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, + processDefinitionVersion)); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..9cdf6c7de68b0c088e86ec9724df20e88ee0beb3 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java @@ -0,0 +1,24 @@ +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import java.util.List; + +@Repository +public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao { + + @Autowired + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + + @Override + public List queryProcessTaskRelationLogByWorkflow( + long workflowDefinitionCode, + int workflowDefinitionVersion) { + return processTaskRelationLogMapper.queryByProcessCodeAndVersion(workflowDefinitionCode, + workflowDefinitionVersion); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..205fc3b6494d34142a3d5ad25b0e1f95c489edb1 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java @@ -0,0 +1,44 @@ +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Repository +public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao { + + @Autowired + private TaskDefinitionLogMapper taskDefinitionLogMapper; + + @Override + public List queryTaskDefinitionByRelations(List processTaskRelations) { + List taskDefinitionLogs = new ArrayList<>(); + if (CollectionUtils.isEmpty(processTaskRelations)) { + return taskDefinitionLogs; + } + Map taskCodeVersionMap = new HashMap<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + if (processTaskRelation.getPreTaskCode() > 0) { + taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()); + } + if (processTaskRelation.getPostTaskCode() > 0) { + taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()); + } + } + + taskCodeVersionMap.forEach((code, version) -> { + taskDefinitionLogs.add(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version)); + }); + return taskDefinitionLogs; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index 510b5d7a71cd35c881c8370707c46bf72ae475a5..0512980b109ed24466c6e61ae4e564ce418c5361 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.dao.repository.impl; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.exception.RepositoryException; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; @@ -28,6 +29,11 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao { return taskInstanceMapper.selectBatchIds(taskInstanceIds); } + @Override + public List queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) { + return taskInstanceMapper.findValidTaskListByProcessId(workflowInstanceId, Flag.YES); + } + @Override public void updateTaskInstance(@NonNull TaskInstance taskInstance) throws RepositoryException { try { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 3f25bb0f49c85314234050e9b4a48f68842f5715..3ab01171e1fa635558b1e51c40f405a5fbbcae06 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -17,7 +17,9 @@ package org.apache.dolphinscheduler.dao.utils; +import com.google.common.collect.Sets; import lombok.NonNull; +import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -30,9 +32,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.commons.collections.CollectionUtils; - +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -41,12 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - /** * dag tools */ @@ -591,4 +589,89 @@ public class DagHelper { } return false; } + + /** + * Get all post nodes of the given node. + *

+ * e.g. the given DAG is A -> B -> C, and taskNode is A, return result is A,B,C + */ + public static Set getAllPostNodes(@NonNull String taskNode, + @NonNull DAG workflowDAG) { + Set result = Sets.newHashSet(); + Set currentLoopNodes = Sets.newHashSet(taskNode); + Set visitedNodes = new HashSet<>(); + while (!currentLoopNodes.isEmpty()) { + Set tmp = new HashSet<>(); + for (String currentNode : currentLoopNodes) { + result.add(currentNode); + Set subsequentNodes = workflowDAG.getSubsequentNodes(currentNode); + visitedNodes.add(currentNode); + subsequentNodes.forEach(s -> { + if (!visitedNodes.contains(s)) { + tmp.add(s); + } + }); + } + currentLoopNodes = tmp; + } + return result; + } + + /** + * Get all pre nodes of the given node. + *

+ * e.g. the given DAG is A -> B -> C, and taskNode is B, return result is A,B + */ + public static Set getAllPreNodes(@NonNull String taskNode, + @NonNull DAG workflowDAG) { + Set result = Sets.newHashSet(); + Set currentLoopNodes = Sets.newHashSet(taskNode); + Set visitedNodes = new HashSet<>(); + while (CollectionUtils.isNotEmpty(currentLoopNodes)) { + Set tmp = new HashSet<>(); + for (String currentNode : currentLoopNodes) { + result.add(currentNode); + visitedNodes.add(currentNode); + workflowDAG.getPreviousNodes(currentNode).forEach(s -> { + if (!visitedNodes.contains(s)) { + tmp.add(s); + } + }); + } + currentLoopNodes = tmp; + } + return result; + } + + /** + * Judge if the givenTaskNode is the child of any given parentNodes or is belongs to parentNodes. + */ + public static boolean isChildOfAnyParentNodes(@NonNull String givenTaskNode, + @NonNull Set parentNodes, + @NonNull DAG workflowDAG) { + if (CollectionUtils.isEmpty(parentNodes)) { + return false; + } + + if (parentNodes.contains(givenTaskNode)) { + return true; + } + + Set postNodesOfParent = new HashSet<>(); + Set visited = new HashSet<>(); + while (CollectionUtils.isNotEmpty(parentNodes)) { + Set tmp = new HashSet<>(); + for (String parentNode : parentNodes) { + postNodesOfParent.add(parentNode); + visited.add(parentNode); + workflowDAG.getSubsequentNodes(parentNode).forEach(node -> { + if (!visited.contains(node)) { + tmp.add(node); + } + }); + } + parentNodes = tmp; + } + return postNodesOfParent.contains(givenTaskNode); + } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml new file mode 100644 index 0000000000000000000000000000000000000000..4a7100010e8472c0e07edb32ac157cc675ed19cf --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/IsolationTaskMapper.xml @@ -0,0 +1,97 @@ + + + + + + + id + ,workflow_instance_id, workflow_instance_name, task_name, task_code, status, create_time, update_time + + + + + + + + + + + + update t_ds_isolation_task + set status = #{status} + where id = #{id} + + + + insert into t_ds_isolation_task ( + workflow_instance_id, + workflow_instance_name, + task_name, + task_code, + status + ) + values + + ( + #{isolationTask.workflowInstanceId}, + #{isolationTask.workflowInstanceName}, + #{isolationTask.taskName}, + #{isolationTask.taskCode}, + #{isolationTask.status} + ) + + + + + delete + from t_ds_isolation_task + where id = #{id} + and status = #{status} + + + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 2422318c10c16d87083d7bf342254eae8e2cafca..8f6df6738cd0793df750a4d34d73c488ff430cb8 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -1977,3 +1977,20 @@ CREATE TABLE t_ds_fav user_id int NOT NULL, PRIMARY KEY (id) ); + +-- ---------------------------- +-- Table structure for t_ds_isolation_task +-- ---------------------------- +DROP TABLE if exists `t_ds_isolation_task`; +CREATE TABLE `t_ds_isolation_task` +( + `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key', + `workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task', + `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task', + `task_name` varchar(255) NOT NULL COMMENT 'isolation task name', + `task_code` bigint NOT NULL COMMENT 'isolation task code', + `status` tinyint(4) DEFAULT '0', + `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 6df6a9b0fef5b5a43288b75940a25654cc71d231..e5b69db827a350334f0bac32cd8c31e2a0b8032a 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -1896,22 +1896,22 @@ CREATE TABLE `t_ds_k8s` ( -- Table structure for t_ds_k8s_namespace -- ---------------------------- DROP TABLE IF EXISTS `t_ds_k8s_namespace`; -CREATE TABLE `t_ds_k8s_namespace` ( - `id` int(11) NOT NULL AUTO_INCREMENT, - `limits_memory` int(11) DEFAULT NULL, - `namespace` varchar(100) DEFAULT NULL, - `online_job_num` int(11) DEFAULT NULL, - `owner` varchar(100) DEFAULT NULL, - `pod_replicas` int(11) DEFAULT NULL, - `pod_request_cpu` decimal(14,3) DEFAULT NULL, - `pod_request_memory` int(11) DEFAULT NULL, - `tag` varchar(100) DEFAULT NULL, - `limits_cpu` decimal(14,3) DEFAULT NULL, - `k8s` varchar(100) DEFAULT NULL, - `create_time` datetime DEFAULT NULL COMMENT 'create time', - `update_time` datetime DEFAULT NULL COMMENT 'update time', - PRIMARY KEY (`id`), - UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`) +CREATE TABLE `t_ds_k8s_namespace`( + `id` int(11) NOT NULL AUTO_INCREMENT, + `limits_memory` int(11) DEFAULT NULL, + `namespace` varchar(100) DEFAULT NULL, + `online_job_num` int(11) DEFAULT NULL, + `owner` varchar(100) DEFAULT NULL, + `pod_replicas` int(11) DEFAULT NULL, + `pod_request_cpu` decimal(14, 3) DEFAULT NULL, + `pod_request_memory` int(11) DEFAULT NULL, + `tag` varchar(100) DEFAULT NULL, + `limits_cpu` decimal(14, 3) DEFAULT NULL, + `k8s` varchar(100) DEFAULT NULL, + `create_time` datetime DEFAULT NULL COMMENT 'create time', + `update_time` datetime DEFAULT NULL COMMENT 'update time', + PRIMARY KEY (`id`), + UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`) ) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; @@ -1919,21 +1919,22 @@ CREATE TABLE `t_ds_k8s_namespace` ( -- Table structure for t_ds_alert_send_status -- ---------------------------- DROP TABLE IF EXISTS t_ds_alert_send_status; -CREATE TABLE t_ds_alert_send_status( - `id` int(11) NOT NULL AUTO_INCREMENT, - `alert_id` int(11) NOT NULL, - `alert_plugin_instance_id` int(11) NOT NULL, - `send_status` tinyint(4) DEFAULT '0', - `log` text, - `create_time` datetime DEFAULT NULL COMMENT 'create time', - PRIMARY KEY (`id`), - UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`) -) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; +DROP TABLE IF EXISTS `t_ds_fav_task`; -- ---------------------------- -- Table structure for t_ds_fav_task -- ---------------------------- -DROP TABLE IF EXISTS `t_ds_fav_task`; +CREATE TABLE t_ds_alert_send_status +( + `id` int(11) NOT NULL AUTO_INCREMENT, + `alert_id` int(11) NOT NULL, + `alert_plugin_instance_id` int(11) NOT NULL, + `send_status` tinyint(4) DEFAULT '0', + `log` text, + `create_time` datetime DEFAULT NULL COMMENT 'create time', + PRIMARY KEY (`id`), + UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; CREATE TABLE `t_ds_fav_task` ( `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'favorite task id', @@ -1943,3 +1944,21 @@ CREATE TABLE `t_ds_fav_task` ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8; + +-- ---------------------------- +-- Table structure for t_ds_isolation_task +-- ---------------------------- +DROP TABLE if exists `t_ds_isolation_task`; +CREATE TABLE `t_ds_isolation_task` +( + `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key', + `workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task', + `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task', + `task_name` varchar(255) NOT NULL COMMENT 'isolation task name', + `task_code` bigint NOT NULL COMMENT 'isolation task code', + `status` tinyint(4) DEFAULT '0', + `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `workflow_instance_index` (`workflow_instance_id`) USING BTREE +) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 15bf31236a4cc4e2606d96862a81cee2d4b0abe3..f9bb5587ccc449c53e8bb8ea78b7f461c620362e 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -1941,3 +1941,21 @@ CREATE TABLE t_ds_fav user_id int NOT NULL, PRIMARY KEY (id) ); + +-- ---------------------------- +-- Table structure for t_ds_isolation_task +-- ---------------------------- +DROP TABLE if exists `t_ds_isolation_task`; +CREATE TABLE `t_ds_isolation_task` +( + `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key', + `workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task', + `workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task', + `task_name` varchar(255) NOT NULL COMMENT 'isolation task name', + `task_code` bigint NOT NULL COMMENT 'isolation task code', + `status` tinyint(4) DEFAULT '0', + `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + CONSTRAINT `workflow_instance_index` (`workflow_instance_id`) +); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java index a80660b5492de92ea7525c8ce7bae7a54bce8274..de1890f1508c3de98d2cf87f82ffa64740bcc85b 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.dao.utils; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; - +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.Sets; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.graph.DAG; @@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; +import org.junit.Assert; +import org.junit.Test; import java.io.IOException; import java.util.ArrayList; @@ -40,10 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.junit.Assert; -import org.junit.Test; - -import com.fasterxml.jackson.core.JsonProcessingException; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; /** * dag helper test @@ -335,9 +334,8 @@ public class DagHelperTest { * 2->8->5->7 * * @return dag - * @throws JsonProcessingException if error throws JsonProcessingException */ - private DAG generateDag() throws IOException { + private DAG generateDag() { List taskNodeList = new ArrayList<>(); TaskNode node1 = new TaskNode(); node1.setId("1"); @@ -529,4 +527,23 @@ public class DagHelperTest { Assert.assertNotNull(dag); } + @Test + public void getAllPostNodes() { + DAG dag = generateDag(); + Set allPostNodes = DagHelper.getAllPostNodes("2", dag); + Assert.assertEquals(Sets.newHashSet("2", "3", "5", "6", "7", "8"), allPostNodes); + } + + @Test + public void getAllPreNodes() { + DAG dag = generateDag(); + Set allPostNodes = DagHelper.getAllPreNodes("2", dag); + Assert.assertEquals(Sets.newHashSet("1", "2"), allPostNodes); + } + + @Test + public void isChildOfAnyParentNodes() { + DAG dag = generateDag(); + Assert.assertTrue(DagHelper.isChildOfAnyParentNodes("2", Sets.newHashSet("1"), dag)); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..826a456fd82db451b114ab38a34f65d4e0bb3245 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/isolation/RefreshIsolationTaskProcessor.java @@ -0,0 +1,26 @@ +package org.apache.dolphinscheduler.server.master.processor.isolation; + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class RefreshIsolationTaskProcessor implements NettyRequestProcessor { + + @Autowired + private IsolationTaskManager isolationTaskManager; + + @Override + public void process(Channel channel, Command command) { + if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) { + throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command)); + } + isolationTaskManager.refreshIsolationTaskMapFromDB(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java index c2eb200059c4aa28f4dfab2ec8caa5e5917b116b..eb4d82cd3d336501d05478eb38bdb04b5e300154 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.rpc; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingData import javax.annotation.PostConstruct; +import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationTaskProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -74,6 +76,9 @@ public class MasterRPCServer implements AutoCloseable { @Autowired private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor; + @Autowired + private RefreshIsolationTaskProcessor refreshIsolationTaskProcessor; + public void start() { logger.info("Starting Master RPC Server..."); // init remoting server @@ -96,6 +101,9 @@ public class MasterRPCServer implements AutoCloseable { this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_REQUEST, + refreshIsolationTaskProcessor); + this.nettyRemotingServer.start(); logger.info("Started Master RPC Server..."); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index 5546b474d774962f736a61e5271fda288850be33..6446248c065f354a9816fcff7adba125fd0bbfff 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager; import org.apache.dolphinscheduler.server.master.service.MasterFailoverService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,9 @@ public class FailoverExecuteThread extends BaseDaemonThread { @Autowired private MasterFailoverService masterFailoverService; + @Autowired + private IsolationTaskManager isolationTaskManager; + protected FailoverExecuteThread() { super("FailoverExecuteThread"); } @@ -66,6 +70,7 @@ public class FailoverExecuteThread extends BaseDaemonThread { // todo: DO we need to schedule a task to do this kind of check // This kind of check may only need to be executed when a master server start masterFailoverService.checkMasterFailover(); + isolationTaskManager.refreshIsolationTaskMapFromDB(); } catch (Exception e) { logger.error("Master failover thread execute error", e); } finally { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 24982437a239c440a58e15143ceabdfa20211a5d..1d823a71a7f2cdb2e9effb2d430d3290fb039854 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; @@ -108,6 +109,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl @Autowired private StateEventCallbackService stateEventCallbackService; + @Autowired + private IsolationTaskDao isolationTaskDao; + private String masterAddress; protected MasterSchedulerBootstrap() { @@ -184,7 +188,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl processAlertManager, masterConfig, stateWheelExecuteThread, - curingGlobalParamsService); + curingGlobalParamsService, + isolationTaskDao); processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable); workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index d05bdf8015efabd0b4ced746a007237019cbb744..e102238178e022c28e9c9645bb9de09b19369dca 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -37,8 +37,10 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Environment; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; @@ -47,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; @@ -188,6 +191,9 @@ public class WorkflowExecuteRunnable implements Callable { private final Set stateCleanTaskCodes = new HashSet<>(); + // isolationTaskCode -> isolatedTimes + private final Map isolationTaskCodesToTimesMap = new HashMap<>(); + /** * state event queue */ @@ -208,6 +214,8 @@ public class WorkflowExecuteRunnable implements Callable { private final CuringParamsService curingParamsService; + private final IsolationTaskDao isolationTaskDao; + private final String masterAddress; /** @@ -227,7 +235,8 @@ public class WorkflowExecuteRunnable implements Callable { @NonNull ProcessAlertManager processAlertManager, @NonNull MasterConfig masterConfig, @NonNull StateWheelExecuteThread stateWheelExecuteThread, - @NonNull CuringParamsService curingParamsService) { + @NonNull CuringParamsService curingParamsService, + @NonNull IsolationTaskDao isolationTaskDao) { this.processService = processService; this.processInstanceDao = processInstanceDao; this.processInstance = processInstance; @@ -235,6 +244,7 @@ public class WorkflowExecuteRunnable implements Callable { this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; this.curingParamsService = curingParamsService; + this.isolationTaskDao = isolationTaskDao; this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); } @@ -435,6 +445,84 @@ public class WorkflowExecuteRunnable implements Callable { } } + public void onlineTaskIsolation(long taskCode) { + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + // find the post running task + Set needToOnlineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag); + // if the current task is finished, kill the post task + // we need to submit an online isolation task event, otherwise there may exist concurrent problem + for (String isolationTaskCodeStr : needToOnlineIsolationTaskCodes) { + Long isolationTaskCode = Long.valueOf(isolationTaskCodeStr); + isolationTaskCodesToTimesMap.put(isolationTaskCode, + isolationTaskCodesToTimesMap.getOrDefault(isolationTaskCode, 0) + 1); + ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode); + if (iTaskProcessor == null) { + logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}", + isolationTaskCode); + continue; + } + iTaskProcessor.action(TaskAction.ISOLATE); + StateEvent stateEvent = new StateEvent(); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + stateEvent.setProcessInstanceId(this.processInstance.getId()); + stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId()); + stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState()); + this.addStateEvent(stateEvent); + } + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); + } + } + + public void cancelTaskIsolation(long taskCode) { + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + // todo: + // restart the killed/ paused task + Set needToOfflineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag); + for (String needToOfflineIsolationTaskCodeStr : needToOfflineIsolationTaskCodes) { + Long isolationTaskCode = Long.valueOf(needToOfflineIsolationTaskCodeStr); + Integer isolateTimes = isolationTaskCodesToTimesMap.get(isolationTaskCode); + if (isolateTimes == null) { + logger.warn( + "The current task has not been isolated, so it don't need to offline isolation, taskCode: {}", + isolationTaskCode); + continue; + } + if (isolateTimes == 1) { + isolationTaskCodesToTimesMap.remove(isolationTaskCode); + ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode); + if (iTaskProcessor == null) { + // the current task has not been submitted + continue; + } + // the current task has no pre isolation, restart + TaskInstance taskInstance = iTaskProcessor.taskInstance(); + if (taskInstance.getState() == ExecutionStatus.PAUSE_BY_ISOLATION) { + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + addTaskToStandByList(taskInstance); + logger.info( + "Cancel isolation task, the current task state is pause_by_isolation, change to submitted_success and add it back to standbyList"); + continue; + } + if (taskInstance.getState() == ExecutionStatus.KILL_BY_ISOLATION) { + addTaskToStandByList(cloneCancelIsolationTaskInstance(taskInstance)); + logger.info( + "Cancel isolation task, the current task state is kill_by_isolation, change to submitted_success and add it back to standbyList"); + continue; + } + } else { + isolationTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1); + } + + } + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); + } + + } + /** * crate new task instance to retry, different objects from the original * @@ -798,6 +886,15 @@ public class WorkflowExecuteRunnable implements Callable { } // generate process dag dag = DagHelper.buildDagGraph(processDag); + List isolationTasks = + isolationTaskDao.queryByWorkflowInstanceId(processInstance.getId(), IsolationTaskStatus.ONLINE); + for (IsolationTask isolationTask : isolationTasks) { + Set allPostNodes = DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag); + allPostNodes.forEach(postNode -> { + isolationTaskCodesToTimesMap.put(isolationTask.getTaskCode(), + isolationTaskCodesToTimesMap.getOrDefault(isolationTask.getTaskCode(), 0) + 1); + }); + } logger.info("Build dag success, dag: {}", dag); } @@ -933,6 +1030,12 @@ public class WorkflowExecuteRunnable implements Callable { */ private Optional submitTaskExec(TaskInstance taskInstance) { try { + if (isolationTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) { + taskInstance.setState(ExecutionStatus.PAUSE_BY_ISOLATION); + logger.info("The current task has been isolated, will set status to PAUSE_BY_ISOLATION, taskCode: {}", + taskInstance.getTaskCode()); + } + // package task instance before submit processService.packageTaskInstance(taskInstance, processInstance); @@ -1117,6 +1220,17 @@ public class WorkflowExecuteRunnable implements Callable { return newTaskInstance; } + private @Nullable TaskInstance cloneCancelIsolationTaskInstance(TaskInstance taskInstance) { + TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); + if (taskNode == null) { + logger.error("clone retry task instance error, taskNode is null, code:{}", taskInstance.getTaskCode()); + return null; + } + TaskInstance newTaskInstance = TaskInstanceUtils.cloneTaskInstance(processInstance, taskNode, taskInstance); + TaskInstanceUtils.injectEnvironment(taskInstance, getTaskInstanceEnvironment(newTaskInstance)); + return newTaskInstance; + } + private @Nullable Environment getTaskInstanceEnvironment(@NonNull TaskInstance taskInstance) { Environment environment = null; if (!taskInstance.getEnvironmentCode().equals(-1L)) { @@ -1468,17 +1582,21 @@ public class WorkflowExecuteRunnable implements Callable { // success if (state == ExecutionStatus.RUNNING_EXECUTION) { - List killTasks = getCompleteTaskByState(ExecutionStatus.KILL); if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) { // tasks currently pending submission, no retries, indicating that depend is waiting to complete return ExecutionStatus.RUNNING_EXECUTION; - } else if (CollectionUtils.isNotEmpty(killTasks)) { + } + if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL))) { // tasks maybe killed manually return ExecutionStatus.FAILURE; - } else { - // if the waiting queue is empty and the status is in progress, then success - return ExecutionStatus.SUCCESS; } + if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL_BY_ISOLATION)) + || CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.PAUSE_BY_ISOLATION))) { + // No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION + return ExecutionStatus.PAUSE_BY_ISOLATION; + } + // if the waiting queue is empty and the status is in progress, then success + return ExecutionStatus.SUCCESS; } return state; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index ad3d42a3a5225f32faf3cb659d1cdb8c7c0324bb..e86a93ffc111e6ec2683e63220484c37d717e7e8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext; @@ -111,6 +112,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected boolean timeout = false; + protected boolean isolated = false; + protected TaskInstance taskInstance = null; protected ProcessInstance processInstance; @@ -160,6 +163,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { */ protected abstract boolean killTask(); + protected abstract boolean isolateTask(); + protected abstract boolean taskTimeout(); protected abstract boolean runTask(); @@ -193,6 +198,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { case DISPATCH: result = dispatch(); break; + case ISOLATE: + result = isolate(); + break; default: logger.error("unknown task action: {}", taskAction); } @@ -256,6 +264,14 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return paused; } + private boolean isolate() { + if (isolated) { + return true; + } + isolated = isolateTask(); + return isolated; + } + protected boolean stop() { if (killed) { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java index 5fcad2885848323d03b40a751729cd3a9d562c95..328562318625d8a9529ab0407ce3307324844c6b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java @@ -21,7 +21,6 @@ import com.google.auto.service.AutoService; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.BlockingOpportunity; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; @@ -30,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; -import org.apache.dolphinscheduler.server.utils.LogUtils; import java.util.ArrayList; import java.util.Date; @@ -89,6 +87,15 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean isolateTask() { + taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION); + taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + logger.info("Blocking task has been isolated"); + return true; + } + @Override protected boolean taskTimeout() { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 7b45be63730fcd93f8638ddea6d72b7a224dc861..e1ef11eef3b96d3cb8609735b0568236c8d819dc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.task; +import com.google.auto.service.AutoService; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -34,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import java.util.Date; -import com.google.auto.service.AutoService; - /** * common task processor */ @@ -130,6 +129,27 @@ public class CommonTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean isolateTask() { + try { + if (taskInstance.getState().typeIsFinished()) { + return true; + } + taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + if (StringUtils.isNotEmpty(taskInstance.getHost())) { + killRemoteTask(); + } + logger.info("Master isolate taskInstance success, taskName: {} taskInstanceId: {}", taskInstance.getName(), + taskInstance.getId()); + return true; + } catch (Exception e) { + logger.error("Master isolate task error, taskInstance id: {}", taskInstance.getId(), e); + return false; + } + } + @Override protected boolean taskTimeout() { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 7d3669c4433dc9c1d4639dc0e945fade7138bd69..5c76496aa368e701e9ad108f1e50933c9f47719a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.runner.task; import com.google.auto.service.AutoService; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; @@ -27,7 +26,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; -import org.apache.dolphinscheduler.server.utils.LogUtils; import java.util.ArrayList; import java.util.Date; @@ -106,6 +104,15 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean isolateTask() { + taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION); + taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + logger.info("Condition task has been isolated"); + return true; + } + @Override public String getType() { return TASK_TYPE_CONDITIONS; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 6dc5cc6b7f18fda2434b9250250d89b1d9ac49f3..d5fe056820df51c5b83eb670464bc896529e59d2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -202,6 +202,14 @@ public class DependentTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean isolateTask() { + taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION); + taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + return true; + } + /** * judge all dependent tasks finish * diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 754eba7d507f540131755b1cfc22ced74a2f8a22..a681366ea5b9c1fc104f6ba4cdca123822a56c90 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -201,6 +201,17 @@ public class SubTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean isolateTask() { + ProcessInstance subProcessInstance = + processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); + if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) { + return true; + } + // todo: isolate sub process + return true; + } + private void sendToSubProcess() { StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index e3ba4c4db02a51b9ce1e73baf978ef8ecf004c94..8ed3c42ce88be19a8288fa99d607942010801fa8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -85,6 +85,14 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean isolateTask() { + taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION); + taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + return true; + } + @Override protected boolean taskTimeout() { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java index 9044945258563d4e91751560f722051d6d7d40a0..2c60d364bc558d215ec1777e47055a7a29e9852a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java @@ -22,6 +22,7 @@ package org.apache.dolphinscheduler.server.master.runner.task; */ public enum TaskAction { PAUSE, + ISOLATE, STOP, TIMEOUT, SUBMIT, diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java new file mode 100644 index 0000000000000000000000000000000000000000..2621766d84705f7015b2cf7b373c6769784e0c74 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/IsolationTaskManager.java @@ -0,0 +1,97 @@ +package org.apache.dolphinscheduler.server.master.service; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus; +import org.apache.dolphinscheduler.dao.entity.IsolationTask; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class IsolationTaskManager { + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + private volatile Map onlineIsolationTasksInMemory = new HashMap<>(); + + private volatile Set currentIsolationTaskIdsInMemory = new HashSet<>(); + + @Autowired + private IsolationTaskDao isolationTaskDao; + + @PostConstruct + public void init() { + refreshIsolationTaskMapFromDB(); + } + + public void refreshIsolationTaskMapFromDB() { + Map totalOnlineIsolationTasksInDB = + isolationTaskDao.queryByStatus(IsolationTaskStatus.ONLINE) + .stream() + .collect(Collectors.toMap(IsolationTask::getId, Function.identity())); + Set totalOnlineIsolationTaskCodesInDB = totalOnlineIsolationTasksInDB.keySet(); + + Collection needToOfflineIsolationTasks = + CollectionUtils.subtract(currentIsolationTaskIdsInMemory, totalOnlineIsolationTaskCodesInDB) + .stream() + .map(onlineIsolationTasksInMemory::get) + .collect(Collectors.toList()); + + Collection needToOnlineIsolationTasks = + CollectionUtils.subtract(totalOnlineIsolationTaskCodesInDB, currentIsolationTaskIdsInMemory) + .stream() + .map(totalOnlineIsolationTasksInDB::get) + .collect(Collectors.toList()); + + currentIsolationTaskIdsInMemory = totalOnlineIsolationTaskCodesInDB; + onlineIsolationTasksInMemory = totalOnlineIsolationTasksInDB; + + offlineIsolationTask(needToOfflineIsolationTasks); + onlineIsolationTask(needToOnlineIsolationTasks); + } + + private void offlineIsolationTask(Collection needOfflineIsolationTasks) { + if (CollectionUtils.isEmpty(needOfflineIsolationTasks)) { + return; + } + for (IsolationTask needOfflineIsolation : needOfflineIsolationTasks) { + WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager + .getByProcessInstanceId(needOfflineIsolation.getWorkflowInstanceId()); + if (workflowExecuteRunnable == null) { + continue; + } + workflowExecuteRunnable.cancelTaskIsolation(needOfflineIsolation.getTaskCode()); + log.info("Backend offline isolation task, isolationTaskId: {}", needOfflineIsolation.getId()); + } + } + + private void onlineIsolationTask(Collection needOnlineIsolationTasks) { + if (CollectionUtils.isEmpty(needOnlineIsolationTasks)) { + return; + } + for (IsolationTask needOnlineIsolationTask : needOnlineIsolationTasks) { + WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager + .getByProcessInstanceId(needOnlineIsolationTask.getWorkflowInstanceId()); + if (workflowExecuteRunnable == null) { + continue; + } + workflowExecuteRunnable.onlineTaskIsolation(needOnlineIsolationTask.getTaskCode()); + log.info("Backend online isolation task, isolationTaskId: {}", needOnlineIsolationTask.getId()); + } + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java index 5d86abc8e4402b7f3fee6c0ec85ef6b23ef00e59..e4bdf3511ca57b9417438a984b751be908a2d190 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/TaskInstanceUtils.java @@ -166,7 +166,6 @@ public class TaskInstanceUtils { taskInstance.setTaskDefine(oldTaskInstance.getTaskDefine()); taskInstance.setProcessDefine(oldTaskInstance.getProcessDefine()); taskInstance.setProcessInstance(processInstance); - taskInstance.setState(oldTaskInstance.getState()); return taskInstance; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java index e1088bf83d06d60e0f9864e46c4828cdd29aefa7..00ae91019d5e234db31a5092ff286d09719497a5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtils.java @@ -36,6 +36,8 @@ public class WorkflowInstanceUtils { return getStartTaskInstanceIdsFromRecoverParam(processInstance); case START_FROM_STATE_CLEAN_TASKS: return getStartTaskInstanceIdsFromStateCleanParam(processInstance); + case RECOVERY_FROM_ISOLATION_TASKS: + return getStartTaskInstanceIdsFromRecoverIsolationParam(processInstance); default: return Collections.emptyList(); } @@ -69,6 +71,22 @@ public class WorkflowInstanceUtils { return stateCleanTaskInstanceIds; } + public static List getStartTaskInstanceIdsFromRecoverIsolationParam(@NonNull ProcessInstance processInstance) { + Map commandParamMap = JSONUtils.toMap(processInstance.getCommandParam()); + List recoveryPausedIsolationIds = + JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS), + new TypeReference>() { + }); + List recoveryKilledIsolationIds = + JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS), + new TypeReference>() { + }); + List result = new ArrayList<>(); + result.addAll(recoveryPausedIsolationIds); + result.addAll(recoveryKilledIsolationIds); + return result; + } + public static List getStartNodeName(@NonNull ProcessInstance processInstance) { List startNodeNameList = new ArrayList<>(); Map paramMap = JSONUtils.toMap(processInstance.getCommandParam()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index 6f55e1d5b53f75b503a6d112b92b34e40c0dee46..856d38cb2fb0e0099df0b9cf021853bfcec9232e 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; +import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; @@ -92,6 +93,8 @@ public class WorkflowExecuteRunnableTest { private CuringParamsService curingGlobalParamsService; + private IsolationTaskDao isolationTaskDao; + @Before public void init() throws Exception { applicationContext = mock(ApplicationContext.class); @@ -124,10 +127,11 @@ public class WorkflowExecuteRunnableTest { curingGlobalParamsService = mock(CuringParamsService.class); NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class); ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class); + isolationTaskDao = mock(IsolationTaskDao.class); workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, - curingGlobalParamsService)); + curingGlobalParamsService, isolationTaskDao)); // prepareProcess init dag Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); dag.setAccessible(true); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 37f0b97f223dc958f48af0b155e9946eee6f95a0..daf92071b5ec73f99494839ec4efa9d8213295c5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -119,5 +119,9 @@ public enum CommandType { /** * workflow executing data response, from master to api */ - WORKFLOW_EXECUTING_DATA_RESPONSE; + WORKFLOW_EXECUTING_DATA_RESPONSE, + + REFRESH_ISOLATION_REQUEST, + ; + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..9b097843c46159b0affef1b926a42dcd422cde81 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/isolation/RefreshIsolationTaskRequest.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.remote.command.isolation; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; + +import java.io.Serializable; + +public class RefreshIsolationTaskRequest implements Serializable { + + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.REFRESH_ISOLATION_REQUEST); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 009b87268124694af24b91db32750a4cd46ac430..c1fe78e78a8f61cf2e8c389cfb2d9a61c23c9d37 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1047,6 +1047,27 @@ public class ProcessServiceImpl implements ProcessService { processInstance.setRunTimes(runTime + 1); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); break; + case RECOVERY_FROM_ISOLATION_TASKS: + Map commandParamMap = JSONUtils.toMap(processInstance.getCommandParam()); + List recoveryPausedIsolationIds = + JSONUtils.parseObject( + commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS), + new TypeReference>() { + }); + if (CollectionUtils.isNotEmpty(recoveryPausedIsolationIds)) { + recoveryPausedIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id))); + } + List recoveryKilledIsolationIds = + JSONUtils.parseObject( + commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS), + new TypeReference>() { + }); + if (CollectionUtils.isNotEmpty(recoveryKilledIsolationIds)) { + recoveryKilledIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id))); + } + processInstance.setRunTimes(runTime + 1); + processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); + break; default: break; } @@ -1564,9 +1585,6 @@ public class ProcessServiceImpl implements ProcessService { taskInstance.getTaskCode()); return null; } - if (processInstanceState == ExecutionStatus.READY_PAUSE) { - taskInstance.setState(ExecutionStatus.PAUSE); - } taskInstance.setExecutorId(processInstance.getExecutorId()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstance)); if (taskInstance.getSubmitTime() == null) { @@ -1603,7 +1621,8 @@ public class ProcessServiceImpl implements ProcessService { if (state == ExecutionStatus.RUNNING_EXECUTION || state == ExecutionStatus.DELAY_EXECUTION || state == ExecutionStatus.KILL - || state == ExecutionStatus.DISPATCH) { + || state == ExecutionStatus.DISPATCH + || state == ExecutionStatus.PAUSE_BY_ISOLATION) { return state; } // return pasue /stop if process instance state is ready pause / stop diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java index da728b2fdb3d55b3132d515c6a3aab69b224b006..a83184e16b880b55b14fb074cc25ae51d3aa30c4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java @@ -17,10 +17,10 @@ package org.apache.dolphinscheduler.plugin.task.api.enums; -import java.util.HashMap; - import com.baomidou.mybatisplus.annotation.EnumValue; +import java.util.HashMap; + /** * running status for workflow and task nodes */ @@ -65,6 +65,8 @@ public enum ExecutionStatus { READY_BLOCK(15, "ready block"), BLOCK(16, "block"), DISPATCH(17, "dispatch"), + PAUSE_BY_ISOLATION(18, "paused by isolation"), + KILL_BY_ISOLATION(19, "killed by isolation"), ; ExecutionStatus(int code, String descp) { @@ -117,10 +119,14 @@ public enum ExecutionStatus { * @return status */ public boolean typeIsFinished() { - return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() + return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() || typeIsPauseByIsolation() || typeIsStop() || typeIsBlock(); } + public boolean typeIsReady() { + return this == READY_PAUSE || this == READY_STOP || this == READY_BLOCK; + } + /** * status is waiting thread * @@ -139,6 +145,18 @@ public enum ExecutionStatus { return this == PAUSE; } + public boolean typeIsPauseByIsolation() { + return this == PAUSE_BY_ISOLATION; + } + + public boolean typeIsKilledByIsolation() { + return this == KILL_BY_ISOLATION; + } + + public boolean typeIsIsolated() { + return this == PAUSE_BY_ISOLATION || this == KILL_BY_ISOLATION; + } + /** * status is pause * @@ -172,7 +190,7 @@ public enum ExecutionStatus { * @return status */ public boolean typeIsCancel() { - return this == KILL || this == STOP; + return this == KILL || this == STOP || this == KILL_BY_ISOLATION; } public int getCode() {