提交 8268714c 编写于 作者: W Wenjun Ruan

Implement isolate task api interface

上级 340064f5
package org.apache.dolphinscheduler.api.checker;
import lombok.NonNull;
import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
public interface IsolationTaskChecker {
void checkCanSubmitTaskIsolation(@NonNull User loginUser,
long projectCode,
ProcessInstance processInstance,
@NonNull List<IsolationTaskExcelParseVO> voList);
void checkCanOnlineTaskIsolation(@NonNull User loginUser,
long projectCode,
@NonNull ProcessInstance processInstance,
@NonNull IsolationTask isolationTask);
void checkCanListingTaskIsolation(@NonNull User loginUser,
long projectCode);
void checkCanDeleteTaskIsolation(@NonNull User loginUser,
long projectCode,
long isolationId);
void checkCanCancelTaskIsolation(@NonNull User loginUser,
long projectCode,
ProcessInstance processInstance,
@NonNull IsolationTask isolationTasks);
}
package org.apache.dolphinscheduler.api.checker;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_CANCEL;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_LIST;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_ONLINE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ISOLATION_TASK_SUBMIT;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
@Component
public class IsolationTaskCheckerImpl implements IsolationTaskChecker {
@Autowired
private ProjectService projectService;
@Autowired
private ProcessDefinitionLogDao processDefinitionLogDao;
@Autowired
private WorkflowDAGService workflowDAGService;
@Autowired
private IsolationTaskDao isolationTaskDao;
@Override
public void checkCanSubmitTaskIsolation(@NonNull User loginUser,
long projectCode,
ProcessInstance processInstance,
@NonNull List<IsolationTaskExcelParseVO> voList) {
Project project = projectService.queryByCode(loginUser, projectCode);
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
}
ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())
.orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
checkSubmitIsolateTaskAuth(loginUser, project, processDefinitionLog);
DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(processDefinitionLog);
checkWorkflowInstanceCanSubmitIsolateTask(processInstance.getName(), processInstance);
checkTaskNodeCanSubmitIsolateTask(voList, workflowDAG);
}
@Override
public void checkCanOnlineTaskIsolation(@NonNull User loginUser,
long projectCode,
ProcessInstance processInstance,
@NonNull IsolationTask isolationTask) {
Project project = projectService.queryByCode(loginUser, projectCode);
checkOnlineIsolationTaskAuth(loginUser, project, processInstance);
if (IsolationTaskStatus.ONLINE.getCode() == isolationTask.getStatus()) {
throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE);
}
String workflowInstanceName = isolationTask.getWorkflowInstanceName();
checkWorkflowInstanceCanSubmitIsolateTask(workflowInstanceName, processInstance);
}
@Override
public void checkCanListingTaskIsolation(@NonNull User loginUser, long projectCode) {
Project project = projectService.queryByCode(loginUser, projectCode);
projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_LIST);
}
@Override
public void checkCanDeleteTaskIsolation(@NonNull User loginUser,
long projectCode,
long isolationId) {
Project project = projectService.queryByCode(loginUser, projectCode);
projectService.checkProjectAndAuth(loginUser, project, projectCode, ISOLATION_TASK_DELETE);
IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
.orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
if (isolationTask.getStatus() != IsolationTaskStatus.OFFLINE.getCode()) {
throw new ServiceException(ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE);
}
}
@Override
public void checkCanCancelTaskIsolation(@NonNull User loginUser,
long projectCode,
ProcessInstance processInstance,
@NonNull IsolationTask isolationTask) {
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
}
Project project = projectService.queryByCode(loginUser, projectCode);
checkCancelIsolationTaskAuth(loginUser, project, processInstance);
if (isolationTask.getStatus() == IsolationTaskStatus.OFFLINE.getCode()) {
throw new ServiceException(ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL);
}
}
private void checkWorkflowInstanceCanSubmitIsolateTask(@NonNull String workflowInstanceName,
ProcessInstance processInstance) {
// check if the given workflow instance can do isolate operation
// If the workflow instance is at ready_xx status, it cannot do isolate operation
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceName);
}
if (processInstance.getState().typeIsReady()) {
throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT, workflowInstanceName);
}
}
private void checkTaskNodeCanSubmitIsolateTask(@NonNull List<IsolationTaskExcelParseVO> voList,
@NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
for (IsolationTaskExcelParseVO vo : voList) {
String taskCodeStr = Long.toString(vo.getTaskCode());
// check if the taskNode exist in DAG
if (!workflowDAG.containsNode(taskCodeStr)) {
throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST, vo.getTaskName());
}
// check if the pre task exist an online isolation task
// if existed, we cannot create a new isolation task
Set<String> allPreNodes = DagHelper.getAllPreNodes(taskCodeStr, workflowDAG);
List<IsolationTask> isolationTasks = isolationTaskDao.queryByTaskCodes(vo.getWorkflowInstanceId(),
allPreNodes.stream().map(Long::parseLong).collect(Collectors.toList()));
// todo: Do we need to support if the sub isolation task is offline?
if (CollectionUtils.isNotEmpty(isolationTasks)) {
throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK);
}
}
}
private void checkSubmitIsolateTaskAuth(@NonNull User loginUser,
@NonNull Project project,
@NonNull ProcessDefinitionLog processDefinitionLog) {
if (processDefinitionLog.getProjectCode() != project.getCode()) {
throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT);
}
projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_SUBMIT);
}
private void checkOnlineIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
}
ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())
.orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
if (processDefinitionLog.getProjectCode() != project.getCode()) {
throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
}
projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_ONLINE);
}
private void checkCancelIsolationTaskAuth(User loginUser, Project project, ProcessInstance processInstance) {
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
}
ProcessDefinitionLog processDefinitionLog = processDefinitionLogDao
.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())
.orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getName()));
if (processDefinitionLog.getProjectCode() != project.getCode()) {
throw new ServiceException(ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR);
}
projectService.checkProjectAndAuth(loginUser, project, project.getCode(), ISOLATION_TASK_CANCEL);
}
}
......@@ -2,11 +2,11 @@ package org.apache.dolphinscheduler.api.checker;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -14,13 +14,12 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.enums.Status.CLEAN_TASK_INSTANCE_ERROR_WORKFLOW_INSTANCE_IS_RUNNING;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.PROJECT_NOT_EXIST;
@Component
public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker {
@Autowired
private ProcessDefinitionDao processDefinitionDao;
private ProcessDefinitionLogDao processDefinitionLogDao;
@Autowired
private ProjectService projectService;
......@@ -30,16 +29,13 @@ public class WorkflowInstanceCheckerImpl implements WorkflowInstanceChecker {
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST);
}
// todo: check permission
ProcessDefinition processDefinition =
processDefinitionDao.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode());
if (processDefinition == null) {
throw new ServiceException(PROCESS_DEFINE_NOT_EXIST, processInstance.getProcessDefinitionCode());
}
Project project = projectService.queryByCode(loginUser, processDefinition.getProjectCode());
if (project == null) {
throw new ServiceException(PROJECT_NOT_EXIST);
}
ProcessDefinitionLog processDefinitionLog =
processDefinitionLogDao
.queryProcessDefinitionByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())
.orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST,
processInstance.getProcessDefinitionCode()));
Project project = projectService.queryByCode(loginUser, processDefinitionLog.getProjectCode());
projectService.checkProjectAndAuth(loginUser, project, project.getCode(), CLEAN_TASK_INSTANCE_STATE);
// check state
......
......@@ -96,6 +96,11 @@ public class ApiFuncIdentificationConstant {
public static final String INSTANCE_BATCH_DELETE = "project:process-instance:batch-delete";
public static final String FORCED_SUCCESS = "project:task-instance:force-success";
public static final String CLEAN_TASK_INSTANCE_STATE = "project:task-instance:clean-state";
public static final String ISOLATION_TASK_LIST = "project:isolation-task:list";
public static final String ISOLATION_TASK_ONLINE = "project:isolation-task:online";
public static final String ISOLATION_TASK_SUBMIT = "project:isolation-task:submit";
public static final String ISOLATION_TASK_CANCEL = "project:isolation-task:cancel";
public static final String ISOLATION_TASK_DELETE = "project:isolation-task:delete";
public static final String VIEW_LOG = "project:log:detail";
public static final String DOWNLOAD_LOG = "project:log:download-log";
public static final String PROJECT_OVERVIEW = "project:overview:view";
......
package org.apache.dolphinscheduler.api.controller;
import io.swagger.annotations.Api;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.IsolationTaskService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
import javax.validation.Valid;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_DELETE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_LISTING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR;
@Api(tags = "ISOLATION_TASK_TAG")
@RestController
@RequestMapping("/projects/{projectCode}/isolation-task")
public class IsolationTaskController {
@Autowired
private IsolationTaskService isolationTaskService;
@PostMapping(value = "/submit")
@ResponseStatus(HttpStatus.OK)
@ApiException(ISOLATION_TASK_SUBMIT_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Void> submitIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable long projectCode,
@RequestBody IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
isolationTaskService.submitTaskIsolations(loginUser, projectCode, isolationTaskSubmitRequest);
return Result.success(null);
}
@PutMapping(value = "/online/{id}")
@ResponseStatus(HttpStatus.OK)
@ApiException(ISOLATION_TASK_SUBMIT_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Void> onlineIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable long projectCode,
@PathVariable(name = "id") long isolationId) {
isolationTaskService.onlineTaskIsolation(loginUser, projectCode, isolationId);
return Result.success(null);
}
@PutMapping(value = "/cancel/{id}")
@ResponseStatus(HttpStatus.OK)
@ApiException(ISOLATION_TASK_SUBMIT_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Void> cancelIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable long projectCode,
@PathVariable(name = "id") long isolationId) {
isolationTaskService.cancelTaskIsolation(loginUser, projectCode, isolationId);
return Result.success(null);
}
@GetMapping("")
@ResponseStatus(HttpStatus.OK)
@ApiException(ISOLATION_TASK_LISTING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<PageInfo<IsolationTask>> listingIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable long projectCode,
@RequestBody @Valid IsolationTaskListingRequest request) {
return Result.success(isolationTaskService.listingTaskIsolation(loginUser, projectCode, request));
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.OK)
@ApiException(ISOLATION_TASK_DELETE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Void> deleteIsolationTask(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable long projectCode,
@PathVariable long id) {
isolationTaskService.deleteTaskIsolation(loginUser, projectCode, id);
return Result.success(null);
}
}
......@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
......@@ -11,5 +12,6 @@ import java.util.List;
@AllArgsConstructor
public class CleanTaskInstanceStateRequest {
@NotEmpty
private List<Integer> taskInstanceIds;
}
package org.apache.dolphinscheduler.api.dto.request;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class IsolationTaskCancelRequest {
@NotEmpty
private List<Long> isolationIds;
}
package org.apache.dolphinscheduler.api.dto.request;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IsolationTaskDeleteRequest {
@NotEmpty
private List<Long> isolationIds;
}
package org.apache.dolphinscheduler.api.dto.request;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IsolationTaskListingRequest {
private String workflowInstanceName;
private String taskName;
@NotNull(message = "page number cannot be null")
private Integer pageNo;
@NotNull(message = "page size cannot be null")
private Integer pageSize;
}
package org.apache.dolphinscheduler.api.dto.request;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
import javax.validation.constraints.NotEmpty;
import java.util.List;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IsolationTaskSubmitRequest {
@NotEmpty
private List<IsolationTaskExcelParseVO> isolationTaskExcelParseVOList;
}
......@@ -513,7 +513,44 @@ public enum Status {
CONFIG_FILTER_EMPTY(1400007, "complement time is empty after filtering according to the configuration",
"当前补数时间根据配置过滤后为空"),
PROJECT_NAME_TOO_LONG_ERROR(1400008, "project name is too long error", "项目名称过长"),
;
ISOLATION_TASK_NOT_EXIST(1500000, "Isolation task not exist", "隔离任务不存在"),
ISOLATION_TASK_SUBMIT_ERROR(1500100, "Submit isolation task error", "提交隔离任务异常"),
ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_SUPPORT(1500101,
"Submit isolation task error, relate workflow instance [{0}] is not support",
"提交隔离任务异常, 关联的工作流实例:[{0}]暂不支持该操作"),
ISOLATION_TASK_SUBMIT_ERROR_WORKFLOW_INSTANCE_NOT_BELONG_TO_CURRENT_PROJECT(1500102,
"Submit isolation task error, relate workflow instance [{0}] is not belong to current project",
"提交隔离任务异常, 关联的工作流实例:[{0}]不属于当前项目"),
ISOLATION_TASK_SUBMIT_ERROR_TASK_NOT_EXIST(1500103, "Submit isolation task error, task: [{0}] is not exist",
"提交隔离任务异常, 任务不存在:[{0}]"),
ISOLATION_TASK_SUBMIT_ERROR_EXIST_SUB_ISOLATION_TASK(1500104,
"Submit isolation task error, workflow instance: [{0}] exist an sub isolation task",
"提交隔离任务异常, 工作流实例已经存在:[{0}]子隔离任务"),
ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500105,
"Submit isolation task error, send request to master error",
"提交隔离任务异常,发送请求给Master异常"),
ISOLATION_TASK_ONLINE_ERROR(1500200, "Online isolation task error", "上线隔离任务异常"),
ISOLATION_TASK_ONLINE_ERROR_ALREADY_ONLINE(1500201,
"Online isolation task error, the isolation task is already online", "上线隔离任务异常,该任务已处于隔离中"),
ISOLATION_TASK_ONLINE_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500202,
"Online isolation task error, send request to master error", "上线隔离任务异常,发送隔离请求给Master异常"),
ISOLATION_TASK_ONLINE_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500203,
"Online isolation task error, the process instance is not belongs to project", "上线隔离任务异常,当前工作流实例不属于目标任务"),
ISOLATION_TASK_CANCEL_ERROR(1500300, "Cancel isolation task error", "取消隔离任务异常"),
ISOLATION_TASK_CANCEL_ERROR_SEND_REQUEST_TO_MASTER_ERROR(1500301,
"Cancel isolation task error, send request to master error", "取消隔离任务异常,发送隔离请求给Master异常"),
ISOLATION_TASK_CANCEL_ERROR_THE_ISOLATION_ALREADY_CANCEL(1500302,
"Cancel isolation task error, this task isolation is already been cancel", "取消隔离任务异常,该隔离已经下线"),
ISOLATION_TASK_CANCEL_ERROR_PROCESS_NOT_BELONG_TO_PROJECT_ERROR(1500303,
"Cancel isolation task error, the process instance is not belongs to project", "取消隔离任务异常,当前工作流实例不属于目标任务"),
ISOLATION_TASK_LISTING_ERROR(1500400, "Listing isolation task error", "查询隔离任务列表异常"),
ISOLATION_TASK_DELETE_ERROR(1500500, "Delete isolation task error", "删除隔离任务异常"),
ISOLATION_TASK_DELETE_ERROR_IS_NOT_OFFLINE(1500501, "Delete isolation task error, the task status is not offline",
"删除隔离任务异常,该隔离任务尚未下线"),
;
private final int code;
private final String enMsg;
......
package org.apache.dolphinscheduler.api.remote;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ApiServerRPCClient {
private final NettyRemotingClient client;
private static final long DEFAULT_TIME_OUT_MILLS = 1_000L;
public ApiServerRPCClient() {
client = new NettyRemotingClient(new NettyClientConfig());
log.info("Success initialized ApiServerRPCClient...");
}
public void sendSyncCommand(@NonNull Host host,
@NonNull Command rpcCommand) throws RemotingException, InterruptedException {
sendSyncCommand(host, rpcCommand, DEFAULT_TIME_OUT_MILLS);
}
public void sendSyncCommand(@NonNull Host host,
@NonNull Command rpcCommand,
long timeoutMills) throws RemotingException, InterruptedException {
client.sendSync(host, rpcCommand, timeoutMills);
}
}
package org.apache.dolphinscheduler.api.service;
import lombok.NonNull;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskCancelRequest;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.User;
public interface IsolationTaskService {
void submitTaskIsolations(@NonNull User loginUser,
long projectCode,
@NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest);
void onlineTaskIsolation(@NonNull User loginUser,
long projectCode,
long isolationTaskId);
void cancelTaskIsolation(@NonNull User loginUser,
long projectCode,
long isolationId);
PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
long projectCode,
@NonNull IsolationTaskListingRequest request);
void deleteTaskIsolation(@NonNull User loginUser,
long projectCode,
long id);
}
......@@ -18,16 +18,16 @@
package org.apache.dolphinscheduler.api.service;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* process instance service
*/
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
import lombok.NonNull;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
......@@ -44,6 +45,7 @@ public interface ProjectService {
* @param projectCode project code
* @return project detail information
*/
@NonNull
Project queryByCode(User loginUser, long projectCode);
/**
......
package org.apache.dolphinscheduler.api.service;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
public interface WorkflowDAGService {
DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull Long processDefinitionCode,
@NonNull Integer processDefinitionVersion);
DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog);
}
package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.api.checker.IsolationTaskChecker;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskListingRequest;
import org.apache.dolphinscheduler.api.dto.request.IsolationTaskSubmitRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.remote.ApiServerRPCClient;
import org.apache.dolphinscheduler.api.service.IsolationTaskService;
import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
import org.apache.dolphinscheduler.api.transformer.CommandTransformer;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.vo.IsolationTaskExcelParseVO;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR;
@Slf4j
@Service
public class IsolationTaskServiceImpl implements IsolationTaskService {
@Autowired
private IsolationTaskDao isolationTaskDao;
@Autowired
private IsolationTaskChecker isolationTaskChecker;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private CommandTransformer commandTransformer;
@Autowired
private ApiServerRPCClient apiServerRPCClient;
@Autowired
private RegistryClient registryClient;
@Autowired
private ProcessService processService;
@Autowired
private WorkflowDAGService workflowDAGService;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Override
@Transactional
public void submitTaskIsolations(@NonNull User loginUser,
long projectCode,
@NonNull IsolationTaskSubmitRequest isolationTaskSubmitRequest) {
Map<Integer, List<IsolationTaskExcelParseVO>> workflow2VoMap =
isolationTaskSubmitRequest.getIsolationTaskExcelParseVOList()
.stream().collect(
HashMap::new, (map, vo) -> {
map.computeIfAbsent(vo.getWorkflowInstanceId(), k -> new ArrayList<>()).add(vo);
},
Map::putAll);
List<Pair<ProcessInstance, List<IsolationTask>>> needToOnlineIsolations = new ArrayList<>();
List<IsolationTask> needToInsertIntoDB = new ArrayList<>();
for (Map.Entry<Integer, List<IsolationTaskExcelParseVO>> entry : workflow2VoMap.entrySet()) {
Integer workflowInstanceId = entry.getKey();
List<IsolationTaskExcelParseVO> vos = entry.getValue();
ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
isolationTaskChecker.checkCanSubmitTaskIsolation(loginUser, projectCode, processInstance, vos);
List<IsolationTask> isolationTasks = entry.getValue().stream().map(vo -> {
return IsolationTask.builder()
.workflowInstanceId(vo.getWorkflowInstanceId())
.workflowInstanceName(vo.getWorkflowInstanceName())
.taskName(vo.getTaskName())
.taskCode(vo.getTaskCode())
.status(IsolationTaskStatus.ONLINE.getCode())
.build();
}).collect(Collectors.toList());
needToInsertIntoDB.addAll(isolationTasks);
needToOnlineIsolations.add(Pair.of(processInstance, isolationTasks));
}
isolationTaskDao.batchInsert(needToInsertIntoDB);
// we split here to avoid rollback RPC request
try {
refreshIsolationTasks();
} catch (Exception ex) {
throw new ServiceException(ISOLATION_TASK_SUBMIT_ERROR_SEND_REQUEST_TO_MASTER_ERROR);
}
}
@Override
@Transactional
public void onlineTaskIsolation(@NonNull User loginUser, long projectCode, long isolationTaskId) {
IsolationTask isolationTask = isolationTaskDao.queryById(isolationTaskId)
.orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
ProcessInstance processInstance =
processInstanceDao.queryProcessInstanceById(isolationTask.getWorkflowInstanceId());
isolationTaskChecker.checkCanOnlineTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
isolationTaskDao.updateIsolationTaskStatus(isolationTaskId, IsolationTaskStatus.ONLINE);
try {
refreshIsolationTasks();
} catch (Exception ex) {
throw new ServiceException(Status.ISOLATION_TASK_ONLINE_ERROR);
}
}
@Override
@Transactional
public void cancelTaskIsolation(@NonNull User loginUser,
long projectCode,
long isolationId) {
IsolationTask isolationTask = isolationTaskDao.queryById(isolationId)
.orElseThrow(() -> new ServiceException(ISOLATION_TASK_NOT_EXIST));
Integer workflowInstanceId = isolationTask.getWorkflowInstanceId();
ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
isolationTaskChecker.checkCanCancelTaskIsolation(loginUser, projectCode, processInstance, isolationTask);
isolationTaskDao.updateIsolationTaskStatus(isolationTask.getId(), IsolationTaskStatus.OFFLINE);
insertRecoveryCommandIfNeeded(processInstance, isolationTask);
try {
refreshIsolationTasks();
} catch (RemotingException | InterruptedException e) {
throw new ServiceException(Status.ISOLATION_TASK_CANCEL_ERROR);
}
}
@Override
public PageInfo<IsolationTask> listingTaskIsolation(@NonNull User loginUser,
long projectCode,
@NonNull IsolationTaskListingRequest request) {
isolationTaskChecker.checkCanListingTaskIsolation(loginUser, projectCode);
Integer pageNo = request.getPageNo();
Integer pageSize = request.getPageSize();
IPage<IsolationTask> iPage = isolationTaskDao.pageQueryIsolationTask(
request.getWorkflowInstanceName(),
request.getTaskName(),
pageNo,
pageSize);
PageInfo<IsolationTask> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) iPage.getTotal());
pageInfo.setTotalList(iPage.getRecords());
return pageInfo;
}
@Override
public void deleteTaskIsolation(@NonNull User loginUser, long projectCode, long id) {
isolationTaskChecker.checkCanDeleteTaskIsolation(loginUser, projectCode, id);
int deleteNum = isolationTaskDao.deleteByIdAndStatus(id, IsolationTaskStatus.OFFLINE);
if (deleteNum <= 0) {
throw new ServiceException(ISOLATION_TASK_NOT_EXIST);
}
}
private void refreshIsolationTasks() throws RemotingException, InterruptedException {
List<Server> masters = registryClient.getServerList(NodeType.MASTER);
if (CollectionUtils.isEmpty(masters)) {
return;
}
org.apache.dolphinscheduler.remote.command.Command refreshIsolationRequest =
new RefreshIsolationTaskRequest().convert2Command();
for (Server master : masters) {
try {
apiServerRPCClient.sendSyncCommand(new Host(master.getHost(), master.getPort()),
refreshIsolationRequest);
} catch (RemotingException | InterruptedException e) {
log.error("Send RefreshIsolationTask request to master error, master: {}", master, e);
throw e;
}
}
}
private void insertRecoveryCommandIfNeeded(@NonNull ProcessInstance processInstance,
@NonNull IsolationTask isolationTask) {
if (processInstance.getState() != ExecutionStatus.PAUSE_BY_ISOLATION) {
return;
}
int workflowInstanceId = processInstance.getId();
// find the isolationTaskInstanceIds need to recovery
// find the sub node is in pause or kill
DAG<String, TaskNode, TaskNodeRelation> workflowDAG = workflowDAGService.getWorkflowDAG(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
List<TaskInstance> taskInstances =
taskInstanceDao.queryValidatedTaskInstanceByWorkflowInstanceId(workflowInstanceId);
Set<String> onlineIsolationTaskCodes =
isolationTaskDao.queryByWorkflowInstanceId(workflowInstanceId, IsolationTaskStatus.ONLINE)
.stream()
.map(onlineIsolationTask -> String.valueOf(onlineIsolationTask.getTaskCode()))
.collect(Collectors.toSet());
List<TaskInstance> canRecoveryTaskInstances = taskInstances.stream()
.filter(taskInstance -> taskInstance.getState().typeIsIsolated())
.filter(taskInstance -> !DagHelper.isChildOfAnyParentNodes(String.valueOf(taskInstance.getTaskCode()),
onlineIsolationTaskCodes, workflowDAG))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(canRecoveryTaskInstances)) {
return;
}
// find if this taskInstance still exist pre isolationTasks
Command command = commandTransformer.transformToRecoveryFromTaskIsolationCommand(processInstance,
canRecoveryTaskInstances);
processService.createCommand(command);
}
}
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.NonNull;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
......@@ -168,7 +169,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
* @return project detail information
*/
@Override
public Project queryByCode(User loginUser, long projectCode) {
public @NonNull Project queryByCode(User loginUser, long projectCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
hasProjectAndPerm(loginUser, project, result, PROJECT);
......@@ -204,10 +205,8 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
perm)) {
// check read permission
Project checkProject = projectMapper.queryByCode(projectCode);
putMsg(result, Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
throw new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
Objects.nonNull(checkProject) ? project.getName() : projectCode);
} else {
putMsg(result, Status.SUCCESS);
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
......@@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
......@@ -41,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -89,6 +92,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
private TaskInstanceDao taskInstanceDao;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private WorkflowInstanceChecker workflowInstanceChecker;
@Autowired
private CommandTransformer commandTransformer;
......@@ -219,7 +228,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
List<TaskInstance> existTaskInstances = taskInstanceDao.queryTaskInstanceByIds(taskInstanceIds);
TaskInstanceValidator.validateTaskInstanceAllExists(taskInstanceIds, existTaskInstances);
Map<Integer, List<Integer>> processInstanceId2TaskInstanceIds =
Map<Integer, List<Integer>> processInstanceId2TaskInstanceIdMap =
existTaskInstances.stream().collect(
HashMap::new,
(map, taskInstance) -> {
......@@ -228,11 +237,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
},
Map::putAll);
List<Command> cleanStateCommands = commandTransformer.transformToCleanTaskInstanceStateCommands(loginUser,
processInstanceId2TaskInstanceIds);
// todo: use batch create and remove the transactional
cleanStateCommands.forEach(command -> processService.createCommand(command));
for (Map.Entry<Integer, List<Integer>> processInstanceId2TaskInstanceIds : processInstanceId2TaskInstanceIdMap
.entrySet()) {
Integer workflowInstanceId = processInstanceId2TaskInstanceIds.getKey();
List<Integer> needToCleanStateTaskInstanceIds = processInstanceId2TaskInstanceIds.getValue();
ProcessInstance workflowInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, workflowInstance);
Command command = commandTransformer.transformToCleanTaskInstanceStateCommand(workflowInstance,
needToCleanStateTaskInstanceIds);
processService.createCommand(command);
}
} catch (ServiceException serviceException) {
throw serviceException;
} catch (Exception ex) {
......
package org.apache.dolphinscheduler.api.service.impl;
import lombok.NonNull;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkflowDAGService;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST;
@Service
public class WorkflowDagServiceImpl implements WorkflowDAGService {
@Autowired
private ProcessDefinitionLogDao processDefinitionLogDao;
// todo: use dao
@Autowired
private ProcessService processService;
@Override
public DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull Long processDefinitionCode,
@NonNull Integer processDefinitionVersion) {
ProcessDefinitionLog processDefinitionLog =
processDefinitionLogDao.queryProcessDefinitionByCode(processDefinitionCode, processDefinitionVersion)
.orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, processDefinitionCode));
return getWorkflowDAG(processDefinitionLog);
}
@Override
public DAG<String, TaskNode, TaskNodeRelation> getWorkflowDAG(@NonNull ProcessDefinitionLog processDefinitionLog) {
List<ProcessTaskRelation> processTaskRelations =
processService.findRelationByCode(processDefinitionLog.getCode(), processDefinitionLog.getVersion());
List<TaskDefinitionLog> taskDefinitionLogs =
processService.getTaskDefineLogListByRelation(processTaskRelations);
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
ProcessDag processDag = DagHelper.generateFlowDag(
taskNodeList,
Collections.emptyList(),
Collections.emptyList(),
TaskDependType.TASK_POST);
if (processDag == null) {
throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
}
return DagHelper.buildDagGraph(processDag);
}
}
......@@ -2,17 +2,16 @@ package org.apache.dolphinscheduler.api.transformer;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.List;
import java.util.Map;
public interface CommandTransformer {
List<Command> transformToCleanTaskInstanceStateCommands(@NonNull User loginUser,
@NonNull Map<Integer, List<Integer>> workflowInstanceId2TaskInstanceIds);
Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance,
@NonNull List<Integer> needCleanTaskInstanceIds);
Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser,
@NonNull Integer workflowInstanceId,
@NonNull List<Integer> needCleanTaskInstances);
Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance,
@NonNull List<TaskInstance> canRecoveryTaskInstances);
}
package org.apache.dolphinscheduler.api.transformer;
import lombok.NonNull;
import org.apache.dolphinscheduler.api.checker.WorkflowInstanceChecker;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS;
@Component
public class CommandTransformerImpl implements CommandTransformer {
@Autowired
private ProcessInstanceDao processInstanceDao;
@Override
public Command transformToCleanTaskInstanceStateCommand(@NonNull ProcessInstance processInstance,
@NonNull List<Integer> needCleanTaskInstanceIds) {
@Autowired
private WorkflowInstanceChecker workflowInstanceChecker;
Command command = new Command();
command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS);
command.setExecutorId(processInstance.getExecutorId());
command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
// we set the task post strategy, since we will go through the post task if the post task is not running
command.setTaskDependType(TaskDependType.TASK_POST);
command.setFailureStrategy(processInstance.getFailureStrategy());
command.setWarningType(processInstance.getWarningType());
command.setWarningGroupId(processInstance.getWarningGroupId());
command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
command.setWorkerGroup(processInstance.getWorkerGroup());
command.setEnvironmentCode(processInstance.getEnvironmentCode());
command.setDryRun(processInstance.getDryRun());
@Override
public List<Command> transformToCleanTaskInstanceStateCommands(@NonNull User loginUser,
@NonNull Map<Integer, List<Integer>> workflowInstanceId2TaskInstanceIds) {
List<Command> commands = new ArrayList<>(workflowInstanceId2TaskInstanceIds.size());
workflowInstanceId2TaskInstanceIds.forEach((workflowInstanceId, taskInstanceIds) -> {
Command command = transformToCleanTaskInstanceStateCommand(loginUser, workflowInstanceId, taskInstanceIds);
commands.add(command);
});
return commands;
// todo:use pojo to represent CommandParam rather than map
Map<String, String> commandParamMap = new HashMap<>();
commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstanceIds));
command.setCommandParam(JSONUtils.toJsonString(commandParamMap));
command.setProcessInstanceId(processInstance.getId());
return command;
}
@Override
public Command transformToCleanTaskInstanceStateCommand(@NonNull User loginUser,
@NonNull Integer workflowInstanceId,
@NonNull List<Integer> needCleanTaskInstances) {
ProcessInstance processInstance = processInstanceDao.queryProcessInstanceById(workflowInstanceId);
workflowInstanceChecker.checkCanCleanTaskInstanceState(loginUser, processInstance);
public Command transformToRecoveryFromTaskIsolationCommand(@NonNull ProcessInstance processInstance,
@NonNull List<TaskInstance> canRecoveryIsolationTaskInstances) {
Command command = new Command();
command.setCommandType(CommandType.START_FROM_STATE_CLEAN_TASKS);
command.setCommandType(CommandType.RECOVERY_FROM_ISOLATION_TASKS);
command.setExecutorId(processInstance.getExecutorId());
command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
......@@ -62,10 +68,22 @@ public class CommandTransformerImpl implements CommandTransformer {
command.setDryRun(processInstance.getDryRun());
// todo:use pojo to represent CommandParam rather than map
List<Integer> recoveryPausedIsolationIds = canRecoveryIsolationTaskInstances.stream()
.filter(taskInstance -> taskInstance.getState().typeIsPauseByIsolation())
.map(TaskInstance::getId)
.collect(Collectors.toList());
List<Integer> recoveryKilledIsolationIds = canRecoveryIsolationTaskInstances.stream()
.filter(taskInstance -> taskInstance.getState().typeIsKilledByIsolation())
.map(TaskInstance::getId)
.collect(Collectors.toList());
Map<String, String> commandParamMap = new HashMap<>();
commandParamMap.put(CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS, JSONUtils.toJsonString(needCleanTaskInstances));
commandParamMap.put(CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS,
JSONUtils.toJsonString(recoveryPausedIsolationIds));
commandParamMap.put(CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS,
JSONUtils.toJsonString(recoveryKilledIsolationIds));
command.setCommandParam(JSONUtils.toJsonString(commandParamMap));
command.setProcessInstanceId(processInstance.getId());
return command;
}
}
package org.apache.dolphinscheduler.api.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotNull;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IsolationTaskExcelParseVO {
@NotNull(message = "Workflow instance id cannot be null")
private Integer workflowInstanceId;
@NotNull(message = "Workflow instance name cannot be null")
private String workflowInstanceName;
@NotNull(message = "Task code cannot be null")
private Long taskCode;
@NotNull(message = "Task name cannot be null")
private String taskName;
}
......@@ -338,6 +338,9 @@ public final class Constants {
public static final String CMD_PARAM_CLEAN_STATE_TASK_INSTANCE_IDS = "CleanStateTaskInstanceIds";
public static final String CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS = "RecoveryPausedIsolationTaskInstanceIds";
public static final String CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS = "RecoveryKilledIsolationTaskInstanceIds";
public static final String CMD_PARAM_START_PARAMS = "StartParams";
public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
......@@ -859,5 +862,5 @@ public final class Constants {
* file upload verify
*/
public static final String FILE_TYPE_RESTRICTED_LIST = "file.type.restricted.list";
public static final String FILE_NAME_RESTRICTED_CONTENT= "file.name.restricted.content";
public static final String FILE_NAME_RESTRICTED_CONTENT = "file.name.restricted.content";
}
......@@ -55,6 +55,8 @@ public enum CommandType {
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
START_FROM_STATE_CLEAN_TASKS(12, "start from state clean tasks"),
RECOVERY_FROM_ISOLATION_TASKS(13, "recovery from isolation tasks"),
;
CommandType(int code, String descp) {
......
package org.apache.dolphinscheduler.dao.dto;
public enum IsolationTaskStatus {
ONLINE(0),
OFFLINE(1),
;
private final int code;
IsolationTaskStatus(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public static IsolationTaskStatus of(int code) {
for (IsolationTaskStatus value : IsolationTaskStatus.values()) {
if (value.getCode() == code) {
return value;
}
}
throw new IllegalArgumentException(String.format("Isolation task status code: %s is invalidated: ", code));
}
}
package org.apache.dolphinscheduler.dao.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_isolation_task")
public class IsolationTask {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
private int workflowInstanceId;
private String workflowInstanceName;
private String taskName;
private long taskCode;
private int status;
private Date createTime;
private Date updateTime;
}
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface IsolationTaskMapper extends BaseMapper<IsolationTask> {
IPage<IsolationTask> pageQuery(@Param("workflowInstanceName") String workflowInstanceName,
@Param("taskName") String taskName,
IPage<IsolationTask> page);
List<IsolationTask> queryByTaskCodes(@NonNull @Param("workflowInstanceId") Integer workflowInstanceId,
@NonNull @Param("taskCodes") List<Long> taskCodes);
void updateIsolationTaskStatus(@Param("id") long isolationTaskId,
@Param("status") int status);
int deleteByIdAndStatus(@Param("id") long isolationTaskId,
@Param("status") int status);
List<IsolationTask> queryByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId,
@Param("status") int status);
void batchInsert(@Param("isolationTasks") List<IsolationTask> isolationTasks);
List<IsolationTask> queryByStatus(@Param("status") int code);
}
package org.apache.dolphinscheduler.dao.repository;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public interface IsolationTaskDao {
/**
* Page query, pageNumber is start from 0.
*/
IPage<IsolationTask> pageQueryIsolationTask(String workflowInstanceName,
String taskName,
int pageNumber,
int pageSize);
List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes);
List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId, IsolationTaskStatus isolationTaskStatus);
Optional<IsolationTask> queryById(long isolationTaskId);
List<IsolationTask> queryByIds(List<Long> isolationTaskIds);
List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus);
int deleteByIdAndStatus(long id, IsolationTaskStatus status);
void insert(IsolationTask isolationTaskDTO);
void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus);
void batchInsert(List<IsolationTask> isolationTasks);
}
package org.apache.dolphinscheduler.dao.repository;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import java.util.Optional;
public interface ProcessDefinitionLogDao {
Optional<ProcessDefinitionLog> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode,
@NonNull Integer processDefinitionVersion);
}
......@@ -38,4 +38,5 @@ public interface ProcessInstanceDao {
* @param processInstance processInstance
*/
int upsertProcessInstance(ProcessInstance processInstance);
}
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import java.util.List;
public interface ProcessTaskRelationLogDao {
List<ProcessTaskRelationLog> queryProcessTaskRelationLogByWorkflow(long workflowDefinitionCode,
int workflowDefinitionVersion);
}
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import java.util.List;
public interface TaskDefinitionLogDao {
List<TaskDefinitionLog> queryTaskDefinitionByRelations(List<ProcessTaskRelationLog> processTaskRelations);
}
......@@ -10,6 +10,8 @@ public interface TaskInstanceDao {
List<TaskInstance> queryTaskInstanceByIds(List<Integer> taskInstanceId);
List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId);
/**
* Update the taskInstance, if update failed will throw exception.
*
......
package org.apache.dolphinscheduler.dao.repository.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@Slf4j
@Repository
public class IsolationTaskDaoImpl implements IsolationTaskDao {
@Autowired
private IsolationTaskMapper isolationTaskMapper;
@Override
public IPage<IsolationTask> pageQueryIsolationTask(String workflowInstanceName,
String taskName,
int pageNumber,
int pageSize) {
Page<IsolationTask> page = new Page<>(pageNumber, pageSize);
return isolationTaskMapper.pageQuery(workflowInstanceName, taskName, page);
}
@Override
public List<IsolationTask> queryByTaskCodes(Integer workflowInstanceId, List<Long> taskCodes) {
if (CollectionUtils.isEmpty(taskCodes)) {
return Collections.emptyList();
}
return isolationTaskMapper.queryByTaskCodes(workflowInstanceId, taskCodes);
}
@Override
public List<IsolationTask> queryByWorkflowInstanceId(Integer workflowInstanceId,
IsolationTaskStatus isolationTaskStatus) {
return isolationTaskMapper.queryByWorkflowInstanceId(workflowInstanceId, isolationTaskStatus.getCode());
}
@Override
public Optional<IsolationTask> queryById(long isolationTaskId) {
return Optional.ofNullable(isolationTaskMapper.selectById(isolationTaskId));
}
@Override
public List<IsolationTask> queryByIds(List<Long> isolationTaskIds) {
return isolationTaskMapper.selectBatchIds(isolationTaskIds);
}
@Override
public List<IsolationTask> queryByStatus(@NonNull IsolationTaskStatus isolationTaskStatus) {
return isolationTaskMapper.queryByStatus(isolationTaskStatus.getCode());
}
@Override
public int deleteByIdAndStatus(long id, IsolationTaskStatus status) {
return isolationTaskMapper.deleteByIdAndStatus(id, status.getCode());
}
@Override
public void insert(IsolationTask isolationTask) {
isolationTaskMapper.insert(isolationTask);
}
@Override
public void updateIsolationTaskStatus(long isolationTaskId, IsolationTaskStatus isolationTaskStatus) {
isolationTaskMapper.updateIsolationTaskStatus(isolationTaskId, isolationTaskStatus.getCode());
}
@Override
public void batchInsert(List<IsolationTask> isolationTasks) {
if (CollectionUtils.isEmpty(isolationTasks)) {
return;
}
isolationTaskMapper.batchInsert(isolationTasks);
}
}
package org.apache.dolphinscheduler.dao.repository.impl;
import lombok.NonNull;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao {
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Override
public Optional<ProcessDefinitionLog> queryProcessDefinitionByCode(@NonNull Long processDefinitionCode,
@NonNull Integer processDefinitionVersion) {
return Optional.ofNullable(
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode,
processDefinitionVersion));
}
}
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao {
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Override
public List<ProcessTaskRelationLog> queryProcessTaskRelationLogByWorkflow(
long workflowDefinitionCode,
int workflowDefinitionVersion) {
return processTaskRelationLogMapper.queryByProcessCodeAndVersion(workflowDefinitionCode,
workflowDefinitionVersion);
}
}
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Repository
public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Override
public List<TaskDefinitionLog> queryTaskDefinitionByRelations(List<ProcessTaskRelationLog> processTaskRelations) {
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
if (CollectionUtils.isEmpty(processTaskRelations)) {
return taskDefinitionLogs;
}
Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion());
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion());
}
}
taskCodeVersionMap.forEach((code, version) -> {
taskDefinitionLogs.add(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version));
});
return taskDefinitionLogs;
}
}
......@@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.exception.RepositoryException;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
......@@ -28,6 +29,11 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
return taskInstanceMapper.selectBatchIds(taskInstanceIds);
}
@Override
public List<TaskInstance> queryValidatedTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) {
return taskInstanceMapper.findValidTaskListByProcessId(workflowInstanceId, Flag.YES);
}
@Override
public void updateTaskInstance(@NonNull TaskInstance taskInstance) throws RepositoryException {
try {
......
......@@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.dao.utils;
import com.google.common.collect.Sets;
import lombok.NonNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
......@@ -30,9 +32,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections.CollectionUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
......@@ -41,12 +45,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
/**
* dag tools
*/
......@@ -591,4 +589,89 @@ public class DagHelper {
}
return false;
}
/**
* Get all post nodes of the given node.
* <p>
* e.g. the given DAG is A -> B -> C, and taskNode is A, return result is A,B,C
*/
public static Set<String> getAllPostNodes(@NonNull String taskNode,
@NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
Set<String> result = Sets.newHashSet();
Set<String> currentLoopNodes = Sets.newHashSet(taskNode);
Set<String> visitedNodes = new HashSet<>();
while (!currentLoopNodes.isEmpty()) {
Set<String> tmp = new HashSet<>();
for (String currentNode : currentLoopNodes) {
result.add(currentNode);
Set<String> subsequentNodes = workflowDAG.getSubsequentNodes(currentNode);
visitedNodes.add(currentNode);
subsequentNodes.forEach(s -> {
if (!visitedNodes.contains(s)) {
tmp.add(s);
}
});
}
currentLoopNodes = tmp;
}
return result;
}
/**
* Get all pre nodes of the given node.
* <p>
* e.g. the given DAG is A -> B -> C, and taskNode is B, return result is A,B
*/
public static Set<String> getAllPreNodes(@NonNull String taskNode,
@NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
Set<String> result = Sets.newHashSet();
Set<String> currentLoopNodes = Sets.newHashSet(taskNode);
Set<String> visitedNodes = new HashSet<>();
while (CollectionUtils.isNotEmpty(currentLoopNodes)) {
Set<String> tmp = new HashSet<>();
for (String currentNode : currentLoopNodes) {
result.add(currentNode);
visitedNodes.add(currentNode);
workflowDAG.getPreviousNodes(currentNode).forEach(s -> {
if (!visitedNodes.contains(s)) {
tmp.add(s);
}
});
}
currentLoopNodes = tmp;
}
return result;
}
/**
* Judge if the givenTaskNode is the child of any given parentNodes or is belongs to parentNodes.
*/
public static boolean isChildOfAnyParentNodes(@NonNull String givenTaskNode,
@NonNull Set<String> parentNodes,
@NonNull DAG<String, TaskNode, TaskNodeRelation> workflowDAG) {
if (CollectionUtils.isEmpty(parentNodes)) {
return false;
}
if (parentNodes.contains(givenTaskNode)) {
return true;
}
Set<String> postNodesOfParent = new HashSet<>();
Set<String> visited = new HashSet<>();
while (CollectionUtils.isNotEmpty(parentNodes)) {
Set<String> tmp = new HashSet<>();
for (String parentNode : parentNodes) {
postNodesOfParent.add(parentNode);
visited.add(parentNode);
workflowDAG.getSubsequentNodes(parentNode).forEach(node -> {
if (!visited.contains(node)) {
tmp.add(node);
}
});
}
parentNodes = tmp;
}
return postNodesOfParent.contains(givenTaskNode);
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.IsolationTaskMapper">
<sql id="baseSql">
id
,workflow_instance_id, workflow_instance_name, task_name, task_code, status, create_time, update_time
</sql>
<select id="pageQuery" resultType="string">
select
<include refid="baseSql"/>
from t_ds_isolation_task
where 1 = 1
<if test="workflowInstanceName != null">
and workflow_instance_name = #{workflowInstanceName}
</if>
<if test="taskName != null">
and task_name = #{taskName}
</if>
</select>
<select id="queryByTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
select
<include refid="baseSql"/>
from t_ds_isolation_task
where workflow_instance_id = #{workflowInstanceId}
and task_code in
<foreach collection="taskCodes" item="taskCode" open="(" separator="," close=")">
#{taskCode}
</foreach>
</select>
<select id="queryByWorkflowInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
select
<include refid="baseSql"/>
from t_ds_isolation_task
where workflow_instance_id = #{workflowInstanceId} and status = #{status}
</select>
<select id="queryByStatus" resultType="org.apache.dolphinscheduler.dao.entity.IsolationTask">
select
<include refid="baseSql"/>
from t_ds_isolation_task
where status = #{status}
</select>
<update id="updateIsolationTaskStatus">
update t_ds_isolation_task
set status = #{status}
where id = #{id}
</update>
<insert id="batchInsert">
insert into t_ds_isolation_task (
workflow_instance_id,
workflow_instance_name,
task_name,
task_code,
status
)
values
<foreach collection="isolationTasks" item="isolationTask" separator=",">
(
#{isolationTask.workflowInstanceId},
#{isolationTask.workflowInstanceName},
#{isolationTask.taskName},
#{isolationTask.taskCode},
#{isolationTask.status}
)
</foreach>
</insert>
<delete id="deleteByIdAndStatus">
delete
from t_ds_isolation_task
where id = #{id}
and status = #{status}
</delete>
</mapper>
......@@ -1977,3 +1977,20 @@ CREATE TABLE t_ds_fav
user_id int NOT NULL,
PRIMARY KEY (id)
);
-- ----------------------------
-- Table structure for t_ds_isolation_task
-- ----------------------------
DROP TABLE if exists `t_ds_isolation_task`;
CREATE TABLE `t_ds_isolation_task`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task',
`workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
`task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
`task_code` bigint NOT NULL COMMENT 'isolation task code',
`status` tinyint(4) DEFAULT '0',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
);
......@@ -1896,22 +1896,22 @@ CREATE TABLE `t_ds_k8s` (
-- Table structure for t_ds_k8s_namespace
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_k8s_namespace`;
CREATE TABLE `t_ds_k8s_namespace` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`limits_memory` int(11) DEFAULT NULL,
`namespace` varchar(100) DEFAULT NULL,
`online_job_num` int(11) DEFAULT NULL,
`owner` varchar(100) DEFAULT NULL,
`pod_replicas` int(11) DEFAULT NULL,
`pod_request_cpu` decimal(14,3) DEFAULT NULL,
`pod_request_memory` int(11) DEFAULT NULL,
`tag` varchar(100) DEFAULT NULL,
`limits_cpu` decimal(14,3) DEFAULT NULL,
`k8s` varchar(100) DEFAULT NULL,
`create_time` datetime DEFAULT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`),
UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
CREATE TABLE `t_ds_k8s_namespace`(
`id` int(11) NOT NULL AUTO_INCREMENT,
`limits_memory` int(11) DEFAULT NULL,
`namespace` varchar(100) DEFAULT NULL,
`online_job_num` int(11) DEFAULT NULL,
`owner` varchar(100) DEFAULT NULL,
`pod_replicas` int(11) DEFAULT NULL,
`pod_request_cpu` decimal(14, 3) DEFAULT NULL,
`pod_request_memory` int(11) DEFAULT NULL,
`tag` varchar(100) DEFAULT NULL,
`limits_cpu` decimal(14, 3) DEFAULT NULL,
`k8s` varchar(100) DEFAULT NULL,
`create_time` datetime DEFAULT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`),
UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
......@@ -1919,21 +1919,22 @@ CREATE TABLE `t_ds_k8s_namespace` (
-- Table structure for t_ds_alert_send_status
-- ----------------------------
DROP TABLE IF EXISTS t_ds_alert_send_status;
CREATE TABLE t_ds_alert_send_status(
`id` int(11) NOT NULL AUTO_INCREMENT,
`alert_id` int(11) NOT NULL,
`alert_plugin_instance_id` int(11) NOT NULL,
`send_status` tinyint(4) DEFAULT '0',
`log` text,
`create_time` datetime DEFAULT NULL COMMENT 'create time',
PRIMARY KEY (`id`),
UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `t_ds_fav_task`;
-- ----------------------------
-- Table structure for t_ds_fav_task
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_fav_task`;
CREATE TABLE t_ds_alert_send_status
(
`id` int(11) NOT NULL AUTO_INCREMENT,
`alert_id` int(11) NOT NULL,
`alert_plugin_instance_id` int(11) NOT NULL,
`send_status` tinyint(4) DEFAULT '0',
`log` text,
`create_time` datetime DEFAULT NULL COMMENT 'create time',
PRIMARY KEY (`id`),
UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE TABLE `t_ds_fav_task`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'favorite task id',
......@@ -1943,3 +1944,21 @@ CREATE TABLE `t_ds_fav_task`
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
-- ----------------------------
-- Table structure for t_ds_isolation_task
-- ----------------------------
DROP TABLE if exists `t_ds_isolation_task`;
CREATE TABLE `t_ds_isolation_task`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task',
`workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
`task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
`task_code` bigint NOT NULL COMMENT 'isolation task code',
`status` tinyint(4) DEFAULT '0',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `workflow_instance_index` (`workflow_instance_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
......@@ -1941,3 +1941,21 @@ CREATE TABLE t_ds_fav
user_id int NOT NULL,
PRIMARY KEY (id)
);
-- ----------------------------
-- Table structure for t_ds_isolation_task
-- ----------------------------
DROP TABLE if exists `t_ds_isolation_task`;
CREATE TABLE `t_ds_isolation_task`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`workflow_instance_id` bigint NOT NULL COMMENT 'workflowInstanceId of the isolation task',
`workflow_instance_name` varchar(255) NOT NULL COMMENT 'workflowInstanceName of the isolation task',
`task_name` varchar(255) NOT NULL COMMENT 'isolation task name',
`task_code` bigint NOT NULL COMMENT 'isolation task code',
`status` tinyint(4) DEFAULT '0',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
CONSTRAINT `workflow_instance_index` (`workflow_instance_id`)
);
......@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.dao.utils;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
......@@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
......@@ -40,10 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
/**
* dag helper test
......@@ -335,9 +334,8 @@ public class DagHelperTest {
* 2->8->5->7
*
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException
*/
private DAG<String, TaskNode, TaskNodeRelation> generateDag() throws IOException {
private DAG<String, TaskNode, TaskNodeRelation> generateDag() {
List<TaskNode> taskNodeList = new ArrayList<>();
TaskNode node1 = new TaskNode();
node1.setId("1");
......@@ -529,4 +527,23 @@ public class DagHelperTest {
Assert.assertNotNull(dag);
}
@Test
public void getAllPostNodes() {
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
Set<String> allPostNodes = DagHelper.getAllPostNodes("2", dag);
Assert.assertEquals(Sets.newHashSet("2", "3", "5", "6", "7", "8"), allPostNodes);
}
@Test
public void getAllPreNodes() {
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
Set<String> allPostNodes = DagHelper.getAllPreNodes("2", dag);
Assert.assertEquals(Sets.newHashSet("1", "2"), allPostNodes);
}
@Test
public void isChildOfAnyParentNodes() {
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
Assert.assertTrue(DagHelper.isChildOfAnyParentNodes("2", Sets.newHashSet("1"), dag));
}
}
package org.apache.dolphinscheduler.server.master.processor.isolation;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RefreshIsolationTaskProcessor implements NettyRequestProcessor {
@Autowired
private IsolationTaskManager isolationTaskManager;
@Override
public void process(Channel channel, Command command) {
if (command.getType() != CommandType.REFRESH_ISOLATION_REQUEST) {
throw new IllegalArgumentException(String.format("The current rpc command: %s is invalidated", command));
}
isolationTaskManager.refreshIsolationTaskMapFromDB();
}
}
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.isolation.RefreshIsolationTaskRequest;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
......@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingData
import javax.annotation.PostConstruct;
import org.apache.dolphinscheduler.server.master.processor.isolation.RefreshIsolationTaskProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -74,6 +76,9 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
@Autowired
private RefreshIsolationTaskProcessor refreshIsolationTaskProcessor;
public void start() {
logger.info("Starting Master RPC Server...");
// init remoting server
......@@ -96,6 +101,9 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REFRESH_ISOLATION_REQUEST,
refreshIsolationTaskProcessor);
this.nettyRemotingServer.start();
logger.info("Started Master RPC Server...");
}
......
......@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.IsolationTaskManager;
import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -42,6 +43,9 @@ public class FailoverExecuteThread extends BaseDaemonThread {
@Autowired
private MasterFailoverService masterFailoverService;
@Autowired
private IsolationTaskManager isolationTaskManager;
protected FailoverExecuteThread() {
super("FailoverExecuteThread");
}
......@@ -66,6 +70,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
masterFailoverService.checkMasterFailover();
isolationTaskManager.refreshIsolationTaskMapFromDB();
} catch (Exception e) {
logger.error("Master failover thread execute error", e);
} finally {
......
......@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
......@@ -108,6 +109,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private StateEventCallbackService stateEventCallbackService;
@Autowired
private IsolationTaskDao isolationTaskDao;
private String masterAddress;
protected MasterSchedulerBootstrap() {
......@@ -184,7 +188,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
processAlertManager,
masterConfig,
stateWheelExecuteThread,
curingGlobalParamsService);
curingGlobalParamsService,
isolationTaskDao);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
......
......@@ -37,8 +37,10 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
......@@ -47,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
......@@ -188,6 +191,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final Set<String> stateCleanTaskCodes = new HashSet<>();
// isolationTaskCode -> isolatedTimes
private final Map<Long, Integer> isolationTaskCodesToTimesMap = new HashMap<>();
/**
* state event queue
*/
......@@ -208,6 +214,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final CuringParamsService curingParamsService;
private final IsolationTaskDao isolationTaskDao;
private final String masterAddress;
/**
......@@ -227,7 +235,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
@NonNull ProcessAlertManager processAlertManager,
@NonNull MasterConfig masterConfig,
@NonNull StateWheelExecuteThread stateWheelExecuteThread,
@NonNull CuringParamsService curingParamsService) {
@NonNull CuringParamsService curingParamsService,
@NonNull IsolationTaskDao isolationTaskDao) {
this.processService = processService;
this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance;
......@@ -235,6 +244,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService;
this.isolationTaskDao = isolationTaskDao;
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
}
......@@ -435,6 +445,84 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
public void onlineTaskIsolation(long taskCode) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
// find the post running task
Set<String> needToOnlineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
// if the current task is finished, kill the post task
// we need to submit an online isolation task event, otherwise there may exist concurrent problem
for (String isolationTaskCodeStr : needToOnlineIsolationTaskCodes) {
Long isolationTaskCode = Long.valueOf(isolationTaskCodeStr);
isolationTaskCodesToTimesMap.put(isolationTaskCode,
isolationTaskCodesToTimesMap.getOrDefault(isolationTaskCode, 0) + 1);
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
if (iTaskProcessor == null) {
logger.warn("The task has not been initialized, shouldn't need to isolated, taskCode: {}",
isolationTaskCode);
continue;
}
iTaskProcessor.action(TaskAction.ISOLATE);
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(iTaskProcessor.taskInstance().getId());
stateEvent.setExecutionStatus(iTaskProcessor.taskInstance().getState());
this.addStateEvent(stateEvent);
}
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
public void cancelTaskIsolation(long taskCode) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
// todo:
// restart the killed/ paused task
Set<String> needToOfflineIsolationTaskCodes = DagHelper.getAllPostNodes(Long.toString(taskCode), dag);
for (String needToOfflineIsolationTaskCodeStr : needToOfflineIsolationTaskCodes) {
Long isolationTaskCode = Long.valueOf(needToOfflineIsolationTaskCodeStr);
Integer isolateTimes = isolationTaskCodesToTimesMap.get(isolationTaskCode);
if (isolateTimes == null) {
logger.warn(
"The current task has not been isolated, so it don't need to offline isolation, taskCode: {}",
isolationTaskCode);
continue;
}
if (isolateTimes == 1) {
isolationTaskCodesToTimesMap.remove(isolationTaskCode);
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(isolationTaskCode);
if (iTaskProcessor == null) {
// the current task has not been submitted
continue;
}
// the current task has no pre isolation, restart
TaskInstance taskInstance = iTaskProcessor.taskInstance();
if (taskInstance.getState() == ExecutionStatus.PAUSE_BY_ISOLATION) {
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
addTaskToStandByList(taskInstance);
logger.info(
"Cancel isolation task, the current task state is pause_by_isolation, change to submitted_success and add it back to standbyList");
continue;
}
if (taskInstance.getState() == ExecutionStatus.KILL_BY_ISOLATION) {
addTaskToStandByList(cloneCancelIsolationTaskInstance(taskInstance));
logger.info(
"Cancel isolation task, the current task state is kill_by_isolation, change to submitted_success and add it back to standbyList");
continue;
}
} else {
isolationTaskCodesToTimesMap.put(isolationTaskCode, isolateTimes - 1);
}
}
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
/**
* crate new task instance to retry, different objects from the original
*
......@@ -798,6 +886,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
List<IsolationTask> isolationTasks =
isolationTaskDao.queryByWorkflowInstanceId(processInstance.getId(), IsolationTaskStatus.ONLINE);
for (IsolationTask isolationTask : isolationTasks) {
Set<String> allPostNodes = DagHelper.getAllPostNodes(Long.toString(isolationTask.getTaskCode()), dag);
allPostNodes.forEach(postNode -> {
isolationTaskCodesToTimesMap.put(isolationTask.getTaskCode(),
isolationTaskCodesToTimesMap.getOrDefault(isolationTask.getTaskCode(), 0) + 1);
});
}
logger.info("Build dag success, dag: {}", dag);
}
......@@ -933,6 +1030,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
try {
if (isolationTaskCodesToTimesMap.containsKey(taskInstance.getTaskCode())) {
taskInstance.setState(ExecutionStatus.PAUSE_BY_ISOLATION);
logger.info("The current task has been isolated, will set status to PAUSE_BY_ISOLATION, taskCode: {}",
taskInstance.getTaskCode());
}
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
......@@ -1117,6 +1220,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return newTaskInstance;
}
private @Nullable TaskInstance cloneCancelIsolationTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
if (taskNode == null) {
logger.error("clone retry task instance error, taskNode is null, code:{}", taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = TaskInstanceUtils.cloneTaskInstance(processInstance, taskNode, taskInstance);
TaskInstanceUtils.injectEnvironment(taskInstance, getTaskInstanceEnvironment(newTaskInstance));
return newTaskInstance;
}
private @Nullable Environment getTaskInstanceEnvironment(@NonNull TaskInstance taskInstance) {
Environment environment = null;
if (!taskInstance.getEnvironmentCode().equals(-1L)) {
......@@ -1468,17 +1582,21 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
// tasks currently pending submission, no retries, indicating that depend is waiting to complete
return ExecutionStatus.RUNNING_EXECUTION;
} else if (CollectionUtils.isNotEmpty(killTasks)) {
}
if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL))) {
// tasks maybe killed manually
return ExecutionStatus.FAILURE;
} else {
// if the waiting queue is empty and the status is in progress, then success
return ExecutionStatus.SUCCESS;
}
if (CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.KILL_BY_ISOLATION))
|| CollectionUtils.isNotEmpty(getCompleteTaskByState(ExecutionStatus.PAUSE_BY_ISOLATION))) {
// No task need to submit, and exist isolation task, the workflow instance need to be PAUSE_BY_ISOLATION
return ExecutionStatus.PAUSE_BY_ISOLATION;
}
// if the waiting queue is empty and the status is in progress, then success
return ExecutionStatus.SUCCESS;
}
return state;
......
......@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
......@@ -111,6 +112,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected boolean timeout = false;
protected boolean isolated = false;
protected TaskInstance taskInstance = null;
protected ProcessInstance processInstance;
......@@ -160,6 +163,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected abstract boolean killTask();
protected abstract boolean isolateTask();
protected abstract boolean taskTimeout();
protected abstract boolean runTask();
......@@ -193,6 +198,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
case DISPATCH:
result = dispatch();
break;
case ISOLATE:
result = isolate();
break;
default:
logger.error("unknown task action: {}", taskAction);
}
......@@ -256,6 +264,14 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return paused;
}
private boolean isolate() {
if (isolated) {
return true;
}
isolated = isolateTask();
return isolated;
}
protected boolean stop() {
if (killed) {
return true;
......
......@@ -21,7 +21,6 @@ import com.google.auto.service.AutoService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.BlockingOpportunity;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
......@@ -30,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
......@@ -89,6 +87,15 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean isolateTask() {
taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
logger.info("Blocking task has been isolated");
return true;
}
@Override
protected boolean taskTimeout() {
return true;
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
......@@ -34,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import java.util.Date;
import com.google.auto.service.AutoService;
/**
* common task processor
*/
......@@ -130,6 +129,27 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean isolateTask() {
try {
if (taskInstance.getState().typeIsFinished()) {
return true;
}
taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
if (StringUtils.isNotEmpty(taskInstance.getHost())) {
killRemoteTask();
}
logger.info("Master isolate taskInstance success, taskName: {} taskInstanceId: {}", taskInstance.getName(),
taskInstance.getId());
return true;
} catch (Exception e) {
logger.error("Master isolate task error, taskInstance id: {}", taskInstance.getId(), e);
return false;
}
}
@Override
protected boolean taskTimeout() {
return true;
......
......@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
......@@ -27,7 +26,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
......@@ -106,6 +104,15 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean isolateTask() {
taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
logger.info("Condition task has been isolated");
return true;
}
@Override
public String getType() {
return TASK_TYPE_CONDITIONS;
......
......@@ -202,6 +202,14 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean isolateTask() {
taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}
/**
* judge all dependent tasks finish
*
......
......@@ -201,6 +201,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean isolateTask() {
ProcessInstance subProcessInstance =
processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
return true;
}
// todo: isolate sub process
return true;
}
private void sendToSubProcess() {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(),
......
......@@ -85,6 +85,14 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean isolateTask() {
taskInstance.setState(ExecutionStatus.KILL_BY_ISOLATION);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}
@Override
protected boolean taskTimeout() {
return true;
......
......@@ -22,6 +22,7 @@ package org.apache.dolphinscheduler.server.master.runner.task;
*/
public enum TaskAction {
PAUSE,
ISOLATE,
STOP,
TIMEOUT,
SUBMIT,
......
package org.apache.dolphinscheduler.server.master.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.dao.dto.IsolationTaskStatus;
import org.apache.dolphinscheduler.dao.entity.IsolationTask;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Service
public class IsolationTaskManager {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private volatile Map<Long, IsolationTask> onlineIsolationTasksInMemory = new HashMap<>();
private volatile Set<Long> currentIsolationTaskIdsInMemory = new HashSet<>();
@Autowired
private IsolationTaskDao isolationTaskDao;
@PostConstruct
public void init() {
refreshIsolationTaskMapFromDB();
}
public void refreshIsolationTaskMapFromDB() {
Map<Long, IsolationTask> totalOnlineIsolationTasksInDB =
isolationTaskDao.queryByStatus(IsolationTaskStatus.ONLINE)
.stream()
.collect(Collectors.toMap(IsolationTask::getId, Function.identity()));
Set<Long> totalOnlineIsolationTaskCodesInDB = totalOnlineIsolationTasksInDB.keySet();
Collection<IsolationTask> needToOfflineIsolationTasks =
CollectionUtils.subtract(currentIsolationTaskIdsInMemory, totalOnlineIsolationTaskCodesInDB)
.stream()
.map(onlineIsolationTasksInMemory::get)
.collect(Collectors.toList());
Collection<IsolationTask> needToOnlineIsolationTasks =
CollectionUtils.subtract(totalOnlineIsolationTaskCodesInDB, currentIsolationTaskIdsInMemory)
.stream()
.map(totalOnlineIsolationTasksInDB::get)
.collect(Collectors.toList());
currentIsolationTaskIdsInMemory = totalOnlineIsolationTaskCodesInDB;
onlineIsolationTasksInMemory = totalOnlineIsolationTasksInDB;
offlineIsolationTask(needToOfflineIsolationTasks);
onlineIsolationTask(needToOnlineIsolationTasks);
}
private void offlineIsolationTask(Collection<IsolationTask> needOfflineIsolationTasks) {
if (CollectionUtils.isEmpty(needOfflineIsolationTasks)) {
return;
}
for (IsolationTask needOfflineIsolation : needOfflineIsolationTasks) {
WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
.getByProcessInstanceId(needOfflineIsolation.getWorkflowInstanceId());
if (workflowExecuteRunnable == null) {
continue;
}
workflowExecuteRunnable.cancelTaskIsolation(needOfflineIsolation.getTaskCode());
log.info("Backend offline isolation task, isolationTaskId: {}", needOfflineIsolation.getId());
}
}
private void onlineIsolationTask(Collection<IsolationTask> needOnlineIsolationTasks) {
if (CollectionUtils.isEmpty(needOnlineIsolationTasks)) {
return;
}
for (IsolationTask needOnlineIsolationTask : needOnlineIsolationTasks) {
WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager
.getByProcessInstanceId(needOnlineIsolationTask.getWorkflowInstanceId());
if (workflowExecuteRunnable == null) {
continue;
}
workflowExecuteRunnable.onlineTaskIsolation(needOnlineIsolationTask.getTaskCode());
log.info("Backend online isolation task, isolationTaskId: {}", needOnlineIsolationTask.getId());
}
}
}
......@@ -166,7 +166,6 @@ public class TaskInstanceUtils {
taskInstance.setTaskDefine(oldTaskInstance.getTaskDefine());
taskInstance.setProcessDefine(oldTaskInstance.getProcessDefine());
taskInstance.setProcessInstance(processInstance);
taskInstance.setState(oldTaskInstance.getState());
return taskInstance;
}
......
......@@ -36,6 +36,8 @@ public class WorkflowInstanceUtils {
return getStartTaskInstanceIdsFromRecoverParam(processInstance);
case START_FROM_STATE_CLEAN_TASKS:
return getStartTaskInstanceIdsFromStateCleanParam(processInstance);
case RECOVERY_FROM_ISOLATION_TASKS:
return getStartTaskInstanceIdsFromRecoverIsolationParam(processInstance);
default:
return Collections.emptyList();
}
......@@ -69,6 +71,22 @@ public class WorkflowInstanceUtils {
return stateCleanTaskInstanceIds;
}
public static List<Integer> getStartTaskInstanceIdsFromRecoverIsolationParam(@NonNull ProcessInstance processInstance) {
Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
List<Integer> recoveryPausedIsolationIds =
JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
new TypeReference<ArrayList<Integer>>() {
});
List<Integer> recoveryKilledIsolationIds =
JSONUtils.parseObject(commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
new TypeReference<ArrayList<Integer>>() {
});
List<Integer> result = new ArrayList<>();
result.addAll(recoveryPausedIsolationIds);
result.addAll(recoveryKilledIsolationIds);
return result;
}
public static List<String> getStartNodeName(@NonNull ProcessInstance processInstance) {
List<String> startNodeNameList = new ArrayList<>();
Map<String, String> paramMap = JSONUtils.toMap(processInstance.getCommandParam());
......
......@@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.dao.repository.IsolationTaskDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
......@@ -92,6 +93,8 @@ public class WorkflowExecuteRunnableTest {
private CuringParamsService curingGlobalParamsService;
private IsolationTaskDao isolationTaskDao;
@Before
public void init() throws Exception {
applicationContext = mock(ApplicationContext.class);
......@@ -124,10 +127,11 @@ public class WorkflowExecuteRunnableTest {
curingGlobalParamsService = mock(CuringParamsService.class);
NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
isolationTaskDao = mock(IsolationTaskDao.class);
workflowExecuteThread =
PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao,
nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread,
curingGlobalParamsService));
curingGlobalParamsService, isolationTaskDao));
// prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
......
......@@ -119,5 +119,9 @@ public enum CommandType {
/**
* workflow executing data response, from master to api
*/
WORKFLOW_EXECUTING_DATA_RESPONSE;
WORKFLOW_EXECUTING_DATA_RESPONSE,
REFRESH_ISOLATION_REQUEST,
;
}
package org.apache.dolphinscheduler.remote.command.isolation;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.io.Serializable;
public class RefreshIsolationTaskRequest implements Serializable {
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.REFRESH_ISOLATION_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}
......@@ -1047,6 +1047,27 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setRunTimes(runTime + 1);
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
break;
case RECOVERY_FROM_ISOLATION_TASKS:
Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
List<Integer> recoveryPausedIsolationIds =
JSONUtils.parseObject(
commandParamMap.get(Constants.CMD_PARAM_RECOVERY_PAUSED_ISOLATED_TASK_IDS),
new TypeReference<ArrayList<Integer>>() {
});
if (CollectionUtils.isNotEmpty(recoveryPausedIsolationIds)) {
recoveryPausedIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id)));
}
List<Integer> recoveryKilledIsolationIds =
JSONUtils.parseObject(
commandParamMap.get(Constants.CMD_PARAM_RECOVERY_KILLED_ISOLATED_TASK_IDS),
new TypeReference<ArrayList<Integer>>() {
});
if (CollectionUtils.isNotEmpty(recoveryKilledIsolationIds)) {
recoveryKilledIsolationIds.forEach(id -> initTaskInstance(findTaskInstanceById(id)));
}
processInstance.setRunTimes(runTime + 1);
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
break;
default:
break;
}
......@@ -1564,9 +1585,6 @@ public class ProcessServiceImpl implements ProcessService {
taskInstance.getTaskCode());
return null;
}
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
taskInstance.setState(ExecutionStatus.PAUSE);
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
if (taskInstance.getSubmitTime() == null) {
......@@ -1603,7 +1621,8 @@ public class ProcessServiceImpl implements ProcessService {
if (state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
|| state == ExecutionStatus.DISPATCH) {
|| state == ExecutionStatus.DISPATCH
|| state == ExecutionStatus.PAUSE_BY_ISOLATION) {
return state;
}
// return pasue /stop if process instance state is ready pause / stop
......
......@@ -17,10 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.api.enums;
import java.util.HashMap;
import com.baomidou.mybatisplus.annotation.EnumValue;
import java.util.HashMap;
/**
* running status for workflow and task nodes
*/
......@@ -65,6 +65,8 @@ public enum ExecutionStatus {
READY_BLOCK(15, "ready block"),
BLOCK(16, "block"),
DISPATCH(17, "dispatch"),
PAUSE_BY_ISOLATION(18, "paused by isolation"),
KILL_BY_ISOLATION(19, "killed by isolation"),
;
ExecutionStatus(int code, String descp) {
......@@ -117,10 +119,14 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsFinished() {
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() || typeIsPauseByIsolation()
|| typeIsStop() || typeIsBlock();
}
public boolean typeIsReady() {
return this == READY_PAUSE || this == READY_STOP || this == READY_BLOCK;
}
/**
* status is waiting thread
*
......@@ -139,6 +145,18 @@ public enum ExecutionStatus {
return this == PAUSE;
}
public boolean typeIsPauseByIsolation() {
return this == PAUSE_BY_ISOLATION;
}
public boolean typeIsKilledByIsolation() {
return this == KILL_BY_ISOLATION;
}
public boolean typeIsIsolated() {
return this == PAUSE_BY_ISOLATION || this == KILL_BY_ISOLATION;
}
/**
* status is pause
*
......@@ -172,7 +190,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsCancel() {
return this == KILL || this == STOP;
return this == KILL || this == STOP || this == KILL_BY_ISOLATION;
}
public int getCode() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册