提交 218ef514 编写于 作者: L lenboo

Merge remote-tracking branch 'upstream/dev-master-refactor' into dev-master-refactor

...@@ -72,8 +72,13 @@ ...@@ -72,8 +72,13 @@
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<exclusions>
<exclusion>
<artifactId>jsr305</artifactId>
<groupId>com.google.code.findbugs</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
......
...@@ -53,6 +53,8 @@ import org.apache.dolphinscheduler.dao.entity.User; ...@@ -53,6 +53,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
...@@ -98,6 +100,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ ...@@ -98,6 +100,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Autowired
StateEventCallbackService stateEventCallbackService;
/** /**
* execute process instance * execute process instance
* *
...@@ -383,6 +388,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ ...@@ -383,6 +388,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether the process is normal // determine whether the process is normal
if (update > 0) { if (update > 0) {
String host = processInstance.getHost();
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
);
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
......
...@@ -592,7 +592,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce ...@@ -592,7 +592,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result; return result;
} }
processService.removeTaskLogFile(processInstanceId); try {
processService.removeTaskLogFile(processInstanceId);
} catch (Exception e) {
logger.error("remove task log failed", e);
}
// delete database cascade // delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId); int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
......
...@@ -58,6 +58,13 @@ ...@@ -58,6 +58,13 @@
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>jsr305</artifactId>
<groupId>com.google.code.findbugs</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
...@@ -636,5 +643,10 @@ ...@@ -636,5 +643,10 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -435,6 +435,8 @@ public final class Constants { ...@@ -435,6 +435,8 @@ public final class Constants {
*/ */
public static final String DATASOURCE_PROPERTIES = "/datasource.properties"; public static final String DATASOURCE_PROPERTIES = "/datasource.properties";
public static final String COMMON_TASK_TYPE = "common";
public static final String DEFAULT = "Default"; public static final String DEFAULT = "Default";
public static final String USER = "user"; public static final String USER = "user";
public static final String PASSWORD = "password"; public static final String PASSWORD = "password";
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import io.netty.channel.Channel;
/**
* state event
*/
public class StateEvent {
/**
* origin_pid-origin_task_id-process_instance_id-task_instance_id
*/
private String key;
private StateEventType type;
private ExecutionStatus executionStatus;
private int taskInstanceId;
private int processInstanceId;
private String context;
private Channel channel;
public ExecutionStatus getExecutionStatus() {
return executionStatus;
}
public void setExecutionStatus(ExecutionStatus executionStatus) {
this.executionStatus = executionStatus;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public String getContext() {
return context;
}
public void setContext(String context) {
this.context = context;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
@Override
public String toString() {
return "State Event :"
+ "key: " + key
+ " type: " + type.toString()
+ " executeStatus: " + executionStatus
+ " task instance id: " + taskInstanceId
+ " process instance id: " + processInstanceId
+ " context: " + context
;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public void setType(StateEventType type) {
this.type = type;
}
public StateEventType getType() {
return this.type;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
public enum StateEventType {
PROCESS_STATE_CHANGE(0, "process statechange"),
TASK_STATE_CHANGE(1, "task state change"),
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout");
StateEventType(int code, String descp) {
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CommandCount; import org.apache.dolphinscheduler.dao.entity.CommandCount;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
...@@ -50,6 +52,10 @@ public interface CommandMapper extends BaseMapper<Command> { ...@@ -50,6 +52,10 @@ public interface CommandMapper extends BaseMapper<Command> {
@Param("endTime") Date endTime, @Param("endTime") Date endTime,
@Param("projectCodeArray") Long[] projectCodeArray); @Param("projectCodeArray") Long[] projectCodeArray);
/**
* query command page
* @return
*/
IPage<Command> queryCommandPage(IPage<Command> page);
} }
...@@ -43,4 +43,9 @@ ...@@ -43,4 +43,9 @@
</if> </if>
group by cmd.command_type group by cmd.command_type
</select> </select>
<select id="queryCommandPage" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select *
from t_ds_command
order by update_time asc
</select>
</mapper> </mapper>
...@@ -83,6 +83,16 @@ ...@@ -83,6 +83,16 @@
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
......
...@@ -30,12 +30,12 @@ public enum CommandType { ...@@ -30,12 +30,12 @@ public enum CommandType {
REMOVE_TAK_LOG_RESPONSE, REMOVE_TAK_LOG_RESPONSE,
/** /**
* roll view log request * roll view log request
*/ */
ROLL_VIEW_LOG_REQUEST, ROLL_VIEW_LOG_REQUEST,
/** /**
* roll view log response * roll view log response
*/ */
ROLL_VIEW_LOG_RESPONSE, ROLL_VIEW_LOG_RESPONSE,
...@@ -109,17 +109,32 @@ public enum CommandType { ...@@ -109,17 +109,32 @@ public enum CommandType {
PING, PING,
/** /**
* pong * pong
*/ */
PONG, PONG,
/** /**
* alert send request * alert send request
*/ */
ALERT_SEND_REQUEST, ALERT_SEND_REQUEST,
/** /**
* alert send response * alert send response
*/ */
ALERT_SEND_RESPONSE; ALERT_SEND_RESPONSE,
/**
* process host update
*/
PROCESS_HOST_UPDATE_REQUST,
/**
* process host update response
*/
PROCESS_HOST_UPDATE_RESPONSE,
/**
* state event request
*/
STATE_EVENT_REQUEST;
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* process host update
*/
public class HostUpdateCommand implements Serializable {
/**
* task id
*/
private int taskInstanceId;
private String processHost;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getProcessHost() {
return processHost;
}
public void setProcessHost(String processHost) {
this.processHost = processHost;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "HostUpdateCommand{"
+ "taskInstanceId=" + taskInstanceId
+ "host=" + processHost
+ '}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
public class HostUpdateResponseCommand implements Serializable {
private int taskInstanceId;
private String processHost;
private int status;
public HostUpdateResponseCommand(int taskInstanceId, String processHost, int code) {
this.taskInstanceId = taskInstanceId;
this.processHost = processHost;
this.status = code;
}
public int getTaskInstanceId() {
return this.taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getProcessHost() {
return this.processHost;
}
public void setProcessHost(String processHost) {
this.processHost = processHost;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "HostUpdateResponseCommand{"
+ "taskInstanceId=" + taskInstanceId
+ "host=" + processHost
+ '}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* db task final result response command
*/
public class StateEventChangeCommand implements Serializable {
private String key;
private ExecutionStatus sourceStatus;
private int sourceProcessInstanceId;
private int sourceTaskInstanceId;
private int destProcessInstanceId;
private int destTaskInstanceId;
public StateEventChangeCommand() {
super();
}
public StateEventChangeCommand(int sourceProcessInstanceId, int sourceTaskInstanceId,
ExecutionStatus sourceStatus,
int destProcessInstanceId,
int destTaskInstanceId
) {
this.key = String.format("%d-%d-%d-%d",
sourceProcessInstanceId,
sourceTaskInstanceId,
destProcessInstanceId,
destTaskInstanceId);
this.sourceStatus = sourceStatus;
this.sourceProcessInstanceId = sourceProcessInstanceId;
this.sourceTaskInstanceId = sourceTaskInstanceId;
this.destProcessInstanceId = destProcessInstanceId;
this.destTaskInstanceId = destTaskInstanceId;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.STATE_EVENT_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "StateEventResponseCommand{"
+ "key=" + key
+ '}';
}
public ExecutionStatus getSourceStatus() {
return sourceStatus;
}
public void setSourceStatus(ExecutionStatus sourceStatus) {
this.sourceStatus = sourceStatus;
}
public int getSourceProcessInstanceId() {
return sourceProcessInstanceId;
}
public void setSourceProcessInstanceId(int sourceProcessInstanceId) {
this.sourceProcessInstanceId = sourceProcessInstanceId;
}
public int getSourceTaskInstanceId() {
return sourceTaskInstanceId;
}
public void setSourceTaskInstanceId(int sourceTaskInstanceId) {
this.sourceTaskInstanceId = sourceTaskInstanceId;
}
public int getDestProcessInstanceId() {
return destProcessInstanceId;
}
public void setDestProcessInstanceId(int destProcessInstanceId) {
this.destProcessInstanceId = destProcessInstanceId;
}
public int getDestTaskInstanceId() {
return destTaskInstanceId;
}
public void setDestTaskInstanceId(int destTaskInstanceId) {
this.destTaskInstanceId = destTaskInstanceId;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* db task final result response command
*/
public class StateEventResponseCommand implements Serializable {
private String key;
private int status;
public StateEventResponseCommand() {
super();
}
public StateEventResponseCommand(int status, String key) {
this.status = status;
this.key = key;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.DB_TASK_RESPONSE);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "StateEventResponseCommand{"
+ "key=" + key
+ ", status=" + status
+ '}';
}
}
...@@ -25,7 +25,7 @@ import java.util.Date; ...@@ -25,7 +25,7 @@ import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
/** /**
* execute task request command * execute task request command
*/ */
public class TaskExecuteAckCommand implements Serializable { public class TaskExecuteAckCommand implements Serializable {
...@@ -34,10 +34,15 @@ public class TaskExecuteAckCommand implements Serializable { ...@@ -34,10 +34,15 @@ public class TaskExecuteAckCommand implements Serializable {
*/ */
private int taskInstanceId; private int taskInstanceId;
/**
* process instance id
*/
private int processInstanceId;
/** /**
* startTime * startTime
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime; private Date startTime;
/** /**
...@@ -109,7 +114,7 @@ public class TaskExecuteAckCommand implements Serializable { ...@@ -109,7 +114,7 @@ public class TaskExecuteAckCommand implements Serializable {
} }
/** /**
* package request command * package request command
* *
* @return command * @return command
*/ */
...@@ -130,6 +135,15 @@ public class TaskExecuteAckCommand implements Serializable { ...@@ -130,6 +135,15 @@ public class TaskExecuteAckCommand implements Serializable {
+ ", status=" + status + ", status=" + status
+ ", logPath='" + logPath + '\'' + ", logPath='" + logPath + '\''
+ ", executePath='" + executePath + '\'' + ", executePath='" + executePath + '\''
+ ", processInstanceId='" + processInstanceId + '\''
+ '}'; + '}';
} }
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
} }
...@@ -32,8 +32,9 @@ public class TaskExecuteResponseCommand implements Serializable { ...@@ -32,8 +32,9 @@ public class TaskExecuteResponseCommand implements Serializable {
public TaskExecuteResponseCommand() { public TaskExecuteResponseCommand() {
} }
public TaskExecuteResponseCommand(int taskInstanceId) { public TaskExecuteResponseCommand(int taskInstanceId, int processInstanceId) {
this.taskInstanceId = taskInstanceId; this.taskInstanceId = taskInstanceId;
this.processInstanceId = processInstanceId;
} }
/** /**
...@@ -41,6 +42,11 @@ public class TaskExecuteResponseCommand implements Serializable { ...@@ -41,6 +42,11 @@ public class TaskExecuteResponseCommand implements Serializable {
*/ */
private int taskInstanceId; private int taskInstanceId;
/**
* process instance id
*/
private int processInstanceId;
/** /**
* status * status
*/ */
...@@ -139,4 +145,12 @@ public class TaskExecuteResponseCommand implements Serializable { ...@@ -139,4 +145,12 @@ public class TaskExecuteResponseCommand implements Serializable {
+ ", appIds='" + appIds + '\'' + ", appIds='" + appIds + '\''
+ '}'; + '}';
} }
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
} }
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.remote.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import io.netty.channel.Channel;
/**
* task callback service
*/
@Service
public class StateEventCallbackService {
private final Logger logger = LoggerFactory.getLogger(StateEventCallbackService.class);
private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200};
/**
* remote channels
*/
private static final ConcurrentHashMap<String, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
/**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
public StateEventCallbackService() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
/**
* add callback channel
*
* @param channel channel
*/
public void addRemoteChannel(String host, NettyRemoteChannel channel) {
REMOTE_CHANNELS.put(host, channel);
}
/**
* get callback channel
*
* @param host
* @return callback channel
*/
private NettyRemoteChannel newRemoteChannel(Host host) {
Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host.getAddress());
if (nettyRemoteChannel != null) {
if (nettyRemoteChannel.isActive()) {
return nettyRemoteChannel;
}
}
newChannel = nettyRemotingClient.getChannel(host);
if (newChannel != null) {
return newRemoteChannel(newChannel, host.getAddress());
}
return null;
}
public int pause(int ntries) {
return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
}
private NettyRemoteChannel newRemoteChannel(Channel newChannel, long opaque, String host) {
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
addRemoteChannel(host, remoteChannel);
return remoteChannel;
}
private NettyRemoteChannel newRemoteChannel(Channel newChannel, String host) {
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
addRemoteChannel(host, remoteChannel);
return remoteChannel;
}
/**
* remove callback channels
*/
public void remove(String host) {
REMOTE_CHANNELS.remove(host);
}
/**
* send result
*
* @param command command
*/
public void sendResult(String address, int port, Command command) {
logger.info("send result, host:{}, command:{}", address, command.toString());
Host host = new Host(address, port);
NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command);
}
}
}
...@@ -55,7 +55,16 @@ ...@@ -55,7 +55,16 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.powermock</groupId> <groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId> <artifactId>powermock-module-junit4</artifactId>
......
...@@ -24,14 +24,19 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer; ...@@ -24,14 +24,19 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;
...@@ -92,6 +97,11 @@ public class MasterServer implements IStoppable { ...@@ -92,6 +97,11 @@ public class MasterServer implements IStoppable {
@Autowired @Autowired
private MasterSchedulerService masterSchedulerService; private MasterSchedulerService masterSchedulerService;
@Autowired
private EventExecuteService eventExecuteService;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps = new ConcurrentHashMap<>();
/** /**
* master server startup, not use web service * master server startup, not use web service
* *
...@@ -111,16 +121,28 @@ public class MasterServer implements IStoppable { ...@@ -111,16 +121,28 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort()); serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); TaskAckProcessor ackProcessor = new TaskAckProcessor();
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); ackProcessor.init(processInstanceExecMaps);
TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor();
taskResponseProcessor.init(processInstanceExecMaps);
StateEventProcessor stateEventProcessor = new StateEventProcessor();
stateEventProcessor.init(processInstanceExecMaps);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, ackProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// self tolerant // self tolerant
this.masterRegistryClient.init(this.processInstanceExecMaps);
this.masterRegistryClient.start(); this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this); this.masterRegistryClient.setRegistryStoppable(this);
this.eventExecuteService.init(this.processInstanceExecMaps);
this.eventExecuteService.start();
// scheduler start // scheduler start
this.masterSchedulerService.init(this.processInstanceExecMaps);
this.masterSchedulerService.start(); this.masterSchedulerService.start();
// start QuartzExecutors // start QuartzExecutors
......
...@@ -45,6 +45,9 @@ public class MasterConfig { ...@@ -45,6 +45,9 @@ public class MasterConfig {
@Value("${master.heartbeat.interval:10}") @Value("${master.heartbeat.interval:10}")
private int masterHeartbeatInterval; private int masterHeartbeatInterval;
@Value("${master.state.wheel.interval:5}")
private int stateWheelInterval;
@Value("${master.task.commit.retryTimes:5}") @Value("${master.task.commit.retryTimes:5}")
private int masterTaskCommitRetryTimes; private int masterTaskCommitRetryTimes;
...@@ -139,4 +142,12 @@ public class MasterConfig { ...@@ -139,4 +142,12 @@ public class MasterConfig {
public void setMasterDispatchTaskNumber(int masterDispatchTaskNumber) { public void setMasterDispatchTaskNumber(int masterDispatchTaskNumber) {
this.masterDispatchTaskNumber = masterDispatchTaskNumber; this.masterDispatchTaskNumber = masterDispatchTaskNumber;
} }
public int getStateWheelInterval() {
return this.stateWheelInterval;
}
public void setStateWheelInterval(int stateWheelInterval) {
this.stateWheelInterval = stateWheelInterval;
}
} }
\ No newline at end of file
...@@ -150,7 +150,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{ ...@@ -150,7 +150,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
* @param command command * @param command command
* @throws ExecuteException if error throws ExecuteException * @throws ExecuteException if error throws ExecuteException
*/ */
private void doExecute(final Host host, final Command command) throws ExecuteException { public void doExecute(final Host host, final Command command) throws ExecuteException {
/** /**
* retry count,default retry 3 * retry count,default retry 3
*/ */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
public class HostUpdateResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(HostUpdateResponseProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
HostUpdateResponseProcessor responseCommand = JSONUtils.parseObject(command.getBody(), HostUpdateResponseProcessor.class);
logger.info("received process host response command : {}", responseCommand);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/**
* handle state event received from master/api
*/
public class StateEventProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(StateEventProcessor.class);
private StateEventResponseService stateEventResponseService;
public StateEventProcessor() {
stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class);
}
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.stateEventResponseService.init(processInstanceExecMaps);
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType()));
StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class);
StateEvent stateEvent = new StateEvent();
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
stateEvent.setKey(stateEventChangeCommand.getKey());
stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId());
stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId());
StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
stateEvent.setType(type);
logger.info("received command : {}", stateEvent.toString());
stateEventResponseService.addResponse(stateEvent);
}
}
...@@ -29,15 +29,18 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; ...@@ -29,15 +29,18 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.channel.Channel; import io.netty.channel.Channel;
/** /**
* task ack processor * task ack processor
*/ */
public class TaskAckProcessor implements NettyRequestProcessor { public class TaskAckProcessor implements NettyRequestProcessor {
...@@ -53,13 +56,18 @@ public class TaskAckProcessor implements NettyRequestProcessor { ...@@ -53,13 +56,18 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/ */
private final TaskInstanceCacheManager taskInstanceCacheManager; private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskAckProcessor(){ public TaskAckProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.taskResponseService.init(processInstanceExecMaps);
}
/** /**
* task ack process * task ack process
*
* @param channel channel channel * @param channel channel channel
* @param command command TaskExecuteAckCommand * @param command command TaskExecuteAckCommand
*/ */
...@@ -82,7 +90,8 @@ public class TaskAckProcessor implements NettyRequestProcessor { ...@@ -82,7 +90,8 @@ public class TaskAckProcessor implements NettyRequestProcessor {
taskAckCommand.getExecutePath(), taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(), taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId(), taskAckCommand.getTaskInstanceId(),
channel); channel,
taskAckCommand.getProcessInstanceId());
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
} }
......
...@@ -28,15 +28,18 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; ...@@ -28,15 +28,18 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.channel.Channel; import io.netty.channel.Channel;
/** /**
* task response processor * task response processor
*/ */
public class TaskResponseProcessor implements NettyRequestProcessor { public class TaskResponseProcessor implements NettyRequestProcessor {
...@@ -52,11 +55,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor { ...@@ -52,11 +55,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/ */
private final TaskInstanceCacheManager taskInstanceCacheManager; private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskResponseProcessor(){ public TaskResponseProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.taskResponseService.init(processInstanceExecMaps);
}
/** /**
* task final result response * task final result response
* need master process , state persistence * need master process , state persistence
...@@ -80,10 +87,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor { ...@@ -80,10 +87,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getAppIds(), responseCommand.getAppIds(),
responseCommand.getTaskInstanceId(), responseCommand.getTaskInstanceId(),
responseCommand.getVarPool(), responseCommand.getVarPool(),
channel channel,
); responseCommand.getProcessInstanceId()
);
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
/**
* task manager
*/
@Component
public class StateEventResponseService {
/**
* logger
*/
private final Logger logger = LoggerFactory.getLogger(StateEventResponseService.class);
/**
* attemptQueue
*/
private final BlockingQueue<StateEvent> eventQueue = new LinkedBlockingQueue<>(5000);
/**
* task response worker
*/
private Thread responseWorker;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) {
if (this.processInstanceMapper == null) {
this.processInstanceMapper = processInstanceMapper;
}
}
@PostConstruct
public void start() {
this.responseWorker = new StateEventResponseWorker();
this.responseWorker.setName("StateEventResponseWorker");
this.responseWorker.start();
}
@PreDestroy
public void stop() {
this.responseWorker.interrupt();
if (!eventQueue.isEmpty()) {
List<StateEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents);
for (StateEvent event : remainEvents) {
this.persist(event);
}
}
}
/**
* put task to attemptQueue
*/
public void addResponse(StateEvent stateEvent) {
try {
eventQueue.put(stateEvent);
} catch (InterruptedException e) {
logger.error("put state event : {} error :{}", stateEvent, e);
Thread.currentThread().interrupt();
}
}
/**
* task worker thread
*/
class StateEventResponseWorker extends Thread {
@Override
public void run() {
while (Stopper.isRunning()) {
try {
// if not task , blocking here
StateEvent stateEvent = eventQueue.take();
persist(stateEvent);
} catch (Exception e) {
logger.error("persist task error", e);
}
}
logger.info("StateEventResponseWorker stopped");
}
}
private void writeResponse(StateEvent stateEvent, ExecutionStatus status) {
Channel channel = stateEvent.getChannel();
if (channel != null) {
StateEventResponseCommand command = new StateEventResponseCommand(status.getCode(), stateEvent.getKey());
channel.writeAndFlush(command.convert2Command());
}
}
private void persist(StateEvent stateEvent) {
if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) {
writeResponse(stateEvent, ExecutionStatus.FAILURE);
return;
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent);
writeResponse(stateEvent, ExecutionStatus.SUCCESS);
}
public BlockingQueue<StateEvent> getEventQueue() {
return eventQueue;
}
}
...@@ -92,6 +92,8 @@ public class TaskResponseEvent { ...@@ -92,6 +92,8 @@ public class TaskResponseEvent {
* channel * channel
*/ */
private Channel channel; private Channel channel;
private int processInstanceId;
public static TaskResponseEvent newAck(ExecutionStatus state, public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime, Date startTime,
...@@ -99,7 +101,8 @@ public class TaskResponseEvent { ...@@ -99,7 +101,8 @@ public class TaskResponseEvent {
String executePath, String executePath,
String logPath, String logPath,
int taskInstanceId, int taskInstanceId,
Channel channel) { Channel channel,
int processInstanceId) {
TaskResponseEvent event = new TaskResponseEvent(); TaskResponseEvent event = new TaskResponseEvent();
event.setState(state); event.setState(state);
event.setStartTime(startTime); event.setStartTime(startTime);
...@@ -109,6 +112,7 @@ public class TaskResponseEvent { ...@@ -109,6 +112,7 @@ public class TaskResponseEvent {
event.setTaskInstanceId(taskInstanceId); event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.ACK); event.setEvent(Event.ACK);
event.setChannel(channel); event.setChannel(channel);
event.setProcessInstanceId(processInstanceId);
return event; return event;
} }
...@@ -118,7 +122,8 @@ public class TaskResponseEvent { ...@@ -118,7 +122,8 @@ public class TaskResponseEvent {
String appIds, String appIds,
int taskInstanceId, int taskInstanceId,
String varPool, String varPool,
Channel channel) { Channel channel,
int processInstanceId) {
TaskResponseEvent event = new TaskResponseEvent(); TaskResponseEvent event = new TaskResponseEvent();
event.setState(state); event.setState(state);
event.setEndTime(endTime); event.setEndTime(endTime);
...@@ -128,6 +133,7 @@ public class TaskResponseEvent { ...@@ -128,6 +133,7 @@ public class TaskResponseEvent {
event.setEvent(Event.RESULT); event.setEvent(Event.RESULT);
event.setVarPool(varPool); event.setVarPool(varPool);
event.setChannel(channel); event.setChannel(channel);
event.setProcessInstanceId(processInstanceId);
return event; return event;
} }
...@@ -227,4 +233,11 @@ public class TaskResponseEvent { ...@@ -227,4 +233,11 @@ public class TaskResponseEvent {
this.channel = channel; this.channel = channel;
} }
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
} }
...@@ -19,15 +19,19 @@ package org.apache.dolphinscheduler.server.master.processor.queue; ...@@ -19,15 +19,19 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
...@@ -54,8 +58,7 @@ public class TaskResponseService { ...@@ -54,8 +58,7 @@ public class TaskResponseService {
/** /**
* attemptQueue * attemptQueue
*/ */
private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<>(5000); private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<>();
/** /**
* process service * process service
...@@ -68,10 +71,18 @@ public class TaskResponseService { ...@@ -68,10 +71,18 @@ public class TaskResponseService {
*/ */
private Thread taskResponseWorker; private Thread taskResponseWorker;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) {
if (this.processInstanceMapper == null) {
this.processInstanceMapper = processInstanceMapper;
}
}
@PostConstruct @PostConstruct
public void start() { public void start() {
this.taskResponseWorker = new TaskResponseWorker(); this.taskResponseWorker = new TaskResponseWorker();
this.taskResponseWorker.setName("TaskResponseWorker"); this.taskResponseWorker.setName("StateEventResponseWorker");
this.taskResponseWorker.start(); this.taskResponseWorker.start();
} }
...@@ -121,7 +132,7 @@ public class TaskResponseService { ...@@ -121,7 +132,7 @@ public class TaskResponseService {
logger.error("persist task error", e); logger.error("persist task error", e);
} }
} }
logger.info("TaskResponseWorker stopped"); logger.info("StateEventResponseWorker stopped");
} }
} }
...@@ -134,18 +145,18 @@ public class TaskResponseService { ...@@ -134,18 +145,18 @@ public class TaskResponseService {
Event event = taskResponseEvent.getEvent(); Event event = taskResponseEvent.getEvent();
Channel channel = taskResponseEvent.getChannel(); Channel channel = taskResponseEvent.getChannel();
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
switch (event) { switch (event) {
case ACK: case ACK:
try { try {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null) { if (taskInstance != null) {
ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState(); ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
processService.changeTaskState(taskInstance, status, processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(), taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(), taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(), taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(), taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId()); taskResponseEvent.getTaskInstanceId());
} }
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
...@@ -158,14 +169,13 @@ public class TaskResponseService { ...@@ -158,14 +169,13 @@ public class TaskResponseService {
break; break;
case RESULT: case RESULT:
try { try {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null) { if (taskInstance != null) {
processService.changeTaskState(taskInstance, taskResponseEvent.getState(), processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(), taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(), taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(), taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool() taskResponseEvent.getVarPool()
); );
} }
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
...@@ -180,6 +190,15 @@ public class TaskResponseService { ...@@ -180,6 +190,15 @@ public class TaskResponseService {
default: default:
throw new IllegalArgumentException("invalid event type : " + event); throw new IllegalArgumentException("invalid event type : " + event);
} }
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskResponseEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThread.addStateEvent(stateEvent);
}
} }
public BlockingQueue<TaskResponseEvent> getEventQueue() { public BlockingQueue<TaskResponseEvent> getEventQueue() {
......
...@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.Constants; ...@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
...@@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; ...@@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
...@@ -45,12 +48,11 @@ import org.apache.dolphinscheduler.spi.register.RegistryConnectState; ...@@ -45,12 +48,11 @@ import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -90,6 +92,8 @@ public class MasterRegistryClient { ...@@ -90,6 +92,8 @@ public class MasterRegistryClient {
*/ */
private ScheduledExecutorService heartBeatExecutor; private ScheduledExecutorService heartBeatExecutor;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
/** /**
* master start time * master start time
*/ */
...@@ -97,6 +101,13 @@ public class MasterRegistryClient { ...@@ -97,6 +101,13 @@ public class MasterRegistryClient {
private String localNodePath; private String localNodePath;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.startTime = DateUtils.dateToString(new Date());
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
this.processInstanceExecMaps = processInstanceExecMaps;
}
public void start() { public void start() {
String nodeLock = registryClient.getMasterStartUpLockPath(); String nodeLock = registryClient.getMasterStartUpLockPath();
try { try {
...@@ -182,7 +193,7 @@ public class MasterRegistryClient { ...@@ -182,7 +193,7 @@ public class MasterRegistryClient {
failoverMaster(serverHost); failoverMaster(serverHost);
break; break;
case WORKER: case WORKER:
failoverWorker(serverHost, true); failoverWorker(serverHost, true, true);
break; break;
default: default:
break; break;
...@@ -265,7 +276,7 @@ public class MasterRegistryClient { ...@@ -265,7 +276,7 @@ public class MasterRegistryClient {
* @param workerHost worker host * @param workerHost worker host
* @param needCheckWorkerAlive need check worker alive * @param needCheckWorkerAlive need check worker alive
*/ */
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) { private void failoverWorker(String workerHost, boolean needCheckWorkerAlive, boolean checkOwner) {
logger.info("start worker[{}] failover ...", workerHost); logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) { for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
...@@ -276,19 +287,36 @@ public class MasterRegistryClient { ...@@ -276,19 +287,36 @@ public class MasterRegistryClient {
} }
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (processInstance != null) { if (workerHost == null
taskInstance.setProcessInstance(processInstance); || !checkOwner
} || processInstance.getHost().equalsIgnoreCase(workerHost)) {
// only failover the task owned myself if worker down.
// failover master need handle worker at the same time
if (processInstance != null) {
taskInstance.setProcessInstance(processInstance);
}
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance) .buildProcessInstanceRelatedInfo(processInstance)
.create(); .create();
// only kill yarn job if exists , the local thread has exited // only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext); ProcessUtils.killYarnJob(taskExecutionContext);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
if (!processInstanceExecMaps.containsKey(processInstance.getId())) {
return;
}
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId());
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadNotify.addStateEvent(stateEvent);
}
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
} }
logger.info("end worker[{}] failover ...", workerHost); logger.info("end worker[{}] failover ...", workerHost);
} }
...@@ -312,6 +340,7 @@ public class MasterRegistryClient { ...@@ -312,6 +340,7 @@ public class MasterRegistryClient {
} }
processService.processNeedFailoverProcessInstances(processInstance); processService.processNeedFailoverProcessInstances(processInstance);
} }
failoverWorker(masterHost, true, false);
logger.info("master failover end"); logger.info("master failover end");
} }
...@@ -324,12 +353,6 @@ public class MasterRegistryClient { ...@@ -324,12 +353,6 @@ public class MasterRegistryClient {
registryClient.releaseLock(registryClient.getMasterLockPath()); registryClient.releaseLock(registryClient.getMasterLockPath());
} }
@PostConstruct
public void init() {
this.startTime = DateUtils.dateToString(new Date());
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
/** /**
* registry * registry
...@@ -337,8 +360,6 @@ public class MasterRegistryClient { ...@@ -337,8 +360,6 @@ public class MasterRegistryClient {
public void registry() { public void registry() {
String address = NetUtils.getAddr(masterConfig.getListenPort()); String address = NetUtils.getAddr(masterConfig.getListenPort());
localNodePath = getMasterPath(); localNodePath = getMasterPath();
registryClient.persistEphemeral(localNodePath, "");
registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener());
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterMaxCpuloadAvg(),
...@@ -347,6 +368,8 @@ public class MasterRegistryClient { ...@@ -347,6 +368,8 @@ public class MasterRegistryClient {
Constants.MASTER_TYPE, Constants.MASTER_TYPE,
registryClient); registryClient);
registryClient.persistEphemeral(localNodePath, heartBeatTask.heartBeatInfo());
registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener());
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
...@@ -369,13 +392,17 @@ public class MasterRegistryClient { ...@@ -369,13 +392,17 @@ public class MasterRegistryClient {
* remove registry info * remove registry info
*/ */
public void unRegistry() { public void unRegistry() {
String address = getLocalAddress(); try {
String localNodePath = getMasterPath(); String address = getLocalAddress();
registryClient.remove(localNodePath); String localNodePath = getMasterPath();
logger.info("master node : {} unRegistry to register center.", address); registryClient.remove(localNodePath);
heartBeatExecutor.shutdown(); logger.info("master node : {} unRegistry to register center.", address);
logger.info("heartbeat executor shutdown"); heartBeatExecutor.shutdown();
registryClient.close(); logger.info("heartbeat executor shutdown");
registryClient.close();
} catch (Exception e) {
logger.error("remove registry path exception ", e);
}
} }
/** /**
......
...@@ -22,17 +22,21 @@ import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHED ...@@ -22,17 +22,21 @@ import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHED
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.register.DataChangeEvent; import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
import org.apache.dolphinscheduler.spi.register.SubscribeListener; import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
...@@ -108,12 +112,26 @@ public class ServerNodeManager implements InitializingBean { ...@@ -108,12 +112,26 @@ public class ServerNodeManager implements InitializingBean {
@Autowired @Autowired
private WorkerGroupMapper workerGroupMapper; private WorkerGroupMapper workerGroupMapper;
private MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
/** /**
* alert dao * alert dao
*/ */
@Autowired @Autowired
private AlertDao alertDao; private AlertDao alertDao;
public static volatile List<Integer> SLOT_LIST = new ArrayList<>();
public static volatile Integer MASTER_SIZE = 0;
public static Integer getSlot() {
if (SLOT_LIST.size() > 0) {
return SLOT_LIST.get(0);
}
return 0;
}
/** /**
* init listener * init listener
* *
...@@ -143,12 +161,11 @@ public class ServerNodeManager implements InitializingBean { ...@@ -143,12 +161,11 @@ public class ServerNodeManager implements InitializingBean {
/** /**
* load nodes from zookeeper * load nodes from zookeeper
*/ */
private void load() { public void load() {
/** /**
* master nodes from zookeeper * master nodes from zookeeper
*/ */
Set<String> initMasterNodes = registryClient.getMasterNodesDirectly(); updateMasterNodes();
syncMasterNodes(initMasterNodes);
/** /**
* worker group nodes from zookeeper * worker group nodes from zookeeper
...@@ -241,13 +258,11 @@ public class ServerNodeManager implements InitializingBean { ...@@ -241,13 +258,11 @@ public class ServerNodeManager implements InitializingBean {
try { try {
if (dataChangeEvent.equals(DataChangeEvent.ADD)) { if (dataChangeEvent.equals(DataChangeEvent.ADD)) {
logger.info("master node : {} added.", path); logger.info("master node : {} added.", path);
Set<String> currentNodes = registryClient.getMasterNodesDirectly(); updateMasterNodes();
syncMasterNodes(currentNodes);
} }
if (dataChangeEvent.equals(DataChangeEvent.REMOVE)) { if (dataChangeEvent.equals(DataChangeEvent.REMOVE)) {
logger.info("master node : {} down.", path); logger.info("master node : {} down.", path);
Set<String> currentNodes = registryClient.getMasterNodesDirectly(); updateMasterNodes();
syncMasterNodes(currentNodes);
alertDao.sendServerStopedAlert(1, path, "MASTER"); alertDao.sendServerStopedAlert(1, path, "MASTER");
} }
} catch (Exception ex) { } catch (Exception ex) {
...@@ -257,6 +272,23 @@ public class ServerNodeManager implements InitializingBean { ...@@ -257,6 +272,23 @@ public class ServerNodeManager implements InitializingBean {
} }
} }
private void updateMasterNodes() {
SLOT_LIST.clear();
this.masterNodes.clear();
String nodeLock = registryClient.getMasterLockPath();
try {
registryClient.getLock(nodeLock);
Set<String> currentNodes = registryClient.getMasterNodesDirectly();
List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
syncMasterNodes(currentNodes, masterNodes);
} catch (Exception e) {
logger.error("update master nodes error", e);
} finally {
registryClient.releaseLock(nodeLock);
}
}
/** /**
* get master nodes * get master nodes
* *
...@@ -274,13 +306,23 @@ public class ServerNodeManager implements InitializingBean { ...@@ -274,13 +306,23 @@ public class ServerNodeManager implements InitializingBean {
/** /**
* sync master nodes * sync master nodes
* *
* @param nodes master nodes * @param nodes master nodes
* @param masterNodes
*/ */
private void syncMasterNodes(Set<String> nodes) { private void syncMasterNodes(Set<String> nodes, List<Server> masterNodes) {
masterLock.lock(); masterLock.lock();
try { try {
masterNodes.clear(); this.masterNodes.addAll(nodes);
masterNodes.addAll(nodes); this.masterPriorityQueue.clear();
this.masterPriorityQueue.putList(masterNodes);
int index = masterPriorityQueue.getIndex(NetUtils.getHost());
if (index >= 0) {
MASTER_SIZE = nodes.size();
SLOT_LIST.add(masterPriorityQueue.getIndex(NetUtils.getHost()));
}
logger.info("update master nodes, master size: {}, slot: {}",
MASTER_SIZE, SLOT_LIST.toString()
);
} finally { } finally {
masterLock.unlock(); masterLock.unlock();
} }
...@@ -290,7 +332,7 @@ public class ServerNodeManager implements InitializingBean { ...@@ -290,7 +332,7 @@ public class ServerNodeManager implements InitializingBean {
* sync worker group nodes * sync worker group nodes
* *
* @param workerGroup worker group * @param workerGroup worker group
* @param nodes worker nodes * @param nodes worker nodes
*/ */
private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes) { private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes) {
workerGroupLock.lock(); workerGroupLock.lock();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@Service
public class EventExecuteService extends Thread {
private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);
/**
* dolphinscheduler database interface
*/
@Autowired
private ProcessService processService;
@Autowired
private MasterConfig masterConfig;
private ExecutorService eventExecService;
/**
*
*/
private StateEventCallbackService stateEventCallbackService;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
private ConcurrentHashMap<String, WorkflowExecuteThread> eventHandlerMap = new ConcurrentHashMap();
ListeningExecutorService listeningExecutorService;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getMasterExecThreads());
this.processInstanceExecMaps = processInstanceExecMaps;
listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService);
this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
}
@Override
public synchronized void start() {
super.setName("EventServiceStarted");
super.start();
}
public void close() {
eventExecService.shutdown();
logger.info("event service stopped...");
}
@Override
public void run() {
logger.info("Event service started");
while (Stopper.isRunning()) {
try {
eventHandler();
} catch (Exception e) {
logger.error("Event service thread error", e);
}
}
}
private void eventHandler() {
for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecMaps.values()) {
if (workflowExecuteThread.eventSize() == 0
|| StringUtils.isEmpty(workflowExecuteThread.getKey())
|| eventHandlerMap.containsKey(workflowExecuteThread.getKey())) {
continue;
}
int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
logger.info("handle process instance : {} events, count:{}",
processInstanceId,
workflowExecuteThread.eventSize());
logger.info("already exists handler process size:{}", this.eventHandlerMap.size());
eventHandlerMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
if (workflowExecuteThread.workFlowFinish()) {
processInstanceExecMaps.remove(processInstanceId);
notifyProcessChanged();
logger.info("process instance {} finished.", processInstanceId);
}
if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) {
processInstanceExecMaps.remove(processInstanceId);
processInstanceExecMaps.put(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread);
}
eventHandlerMap.remove(workflowExecuteThread.getKey());
}
private void notifyProcessChanged() {
Map<ProcessInstance, TaskInstance> fatherMaps
= processService.notifyProcessList(processInstanceId, 0);
for (ProcessInstance processInstance : fatherMaps.keySet()) {
String address = NetUtils.getAddr(masterConfig.getListenPort());
if (processInstance.getHost().equalsIgnoreCase(address)) {
notifyMyself(processInstance, fatherMaps.get(processInstance));
} else {
notifyProcess(processInstance, fatherMaps.get(processInstance));
}
}
}
private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
if (!processInstanceExecMaps.containsKey(processInstance.getId())) {
return;
}
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId());
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workflowExecuteThreadNotify.addStateEvent(stateEvent);
}
private void notifyProcess(ProcessInstance processInstance, TaskInstance taskInstance) {
String host = processInstance.getHost();
if (StringUtils.isEmpty(host)) {
logger.info("process {} host is empty, cannot notify task {} now.",
processInstance.getId(), taskInstance.getId());
return;
}
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
logger.info("notify process {} task {} state change, host:{}",
processInstance.getId(), taskInstance.getId(), host);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId()
);
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
}
@Override
public void onFailure(Throwable throwable) {
}
};
Futures.addCallback(future, futureCallback, this.listeningExecutorService);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import java.util.Date;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* master task exec base class
*/
public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* logger of MasterBaseTaskExecThread
*/
protected Logger logger = LoggerFactory.getLogger(getClass());
/**
* process service
*/
protected ProcessService processService;
/**
* alert database access
*/
protected AlertDao alertDao;
/**
* process instance
*/
protected ProcessInstance processInstance;
/**
* task instance
*/
protected TaskInstance taskInstance;
/**
* whether need cancel
*/
protected boolean cancel;
/**
* master config
*/
protected MasterConfig masterConfig;
/**
* taskUpdateQueue
*/
private TaskPriorityQueue taskUpdateQueue;
/**
* whether need check task time out.
*/
protected boolean checkTimeoutFlag = false;
/**
* task timeout parameters
*/
protected TaskTimeoutParameter taskTimeoutParameter;
/**
* constructor of MasterBaseTaskExecThread
*
* @param taskInstance task instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance) {
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
initTaskParams();
}
/**
* init task ordinary parameters
*/
private void initTaskParams() {
initTimeoutParams();
}
/**
* init task timeout parameters
*/
private void initTimeoutParams() {
TaskDefinition taskDefinition = processService.findTaskDefinition(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
boolean timeoutEnable = taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN;
taskTimeoutParameter = new TaskTimeoutParameter(timeoutEnable,
taskDefinition.getTimeoutNotifyStrategy(),
taskDefinition.getTimeout());
if (taskTimeoutParameter.getEnable()) {
checkTimeoutFlag = true;
}
}
/**
* get task instance
*
* @return TaskInstance
*/
public TaskInstance getTaskInstance() {
return this.taskInstance;
}
/**
* kill master base task exec thread
*/
public void kill() {
this.cancel = true;
}
/**
* submit master base task exec thread
*
* @return TaskInstance
*/
protected TaskInstance submit() {
Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
int retryTimes = 1;
boolean submitDB = false;
boolean submitTask = false;
TaskInstance task = null;
while (retryTimes <= commitRetryTimes) {
try {
if (!submitDB) {
// submit task to db
task = processService.submitTask(taskInstance);
if (task != null && task.getId() != 0) {
submitDB = true;
}
}
if (submitDB && !submitTask) {
// dispatch task
submitTask = dispatchTask(task);
}
if (submitDB && submitTask) {
return task;
}
if (!submitDB) {
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
} else if (!submitTask) {
logger.error("task commit failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);
}
Thread.sleep(commitRetryInterval);
} catch (Exception e) {
logger.error("task commit to mysql and dispatcht task failed", e);
}
retryTimes += 1;
}
return task;
}
/**
* dispatch task
*
* @param taskInstance taskInstance
* @return whether submit task success
*/
public Boolean dispatchTask(TaskInstance taskInstance) {
try {
if (taskInstance.isConditionsTask()
|| taskInstance.isDependTask()
|| taskInstance.isSubProcess()
|| taskInstance.isSwitchTask()
) {
return true;
}
if (taskInstance.getState().typeIsFinished()) {
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
}
// task cannot be submitted because its execution state is RUNNING or DELAY.
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
|| taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.info("task ready to submit: {}", taskInstance);
/**
* taskPriority
*/
TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(),
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
return true;
} catch (Exception e) {
logger.error("submit task Exception: ", e);
logger.error("task error : %s", JSONUtils.toJsonString(taskInstance));
return false;
}
}
/**
* buildTaskPriority
*
* @param processInstancePriority processInstancePriority
* @param processInstanceId processInstanceId
* @param taskInstancePriority taskInstancePriority
* @param taskInstanceId taskInstanceId
* @param workerGroup workerGroup
* @return TaskPriority
*/
private TaskPriority buildTaskPriority(int processInstancePriority,
int processInstanceId,
int taskInstancePriority,
int taskInstanceId,
String workerGroup) {
return new TaskPriority(processInstancePriority, processInstanceId,
taskInstancePriority, taskInstanceId, workerGroup);
}
/**
* submit wait complete
*
* @return true
*/
protected Boolean submitWaitComplete() {
return true;
}
/**
* call
*
* @return boolean
*/
@Override
public Boolean call() {
this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
return submitWaitComplete();
}
/**
* alert time out
*/
protected boolean alertTimeout() {
if (TaskTimeoutStrategy.FAILED == this.taskTimeoutParameter.getStrategy()) {
return true;
}
logger.warn("process id:{} process name:{} task id: {},name:{} execution time out",
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
// send warn mail
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(), processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return true;
}
/**
* handle time out for time out strategy warn&&failed
*/
protected void handleTimeoutFailed() {
if (TaskTimeoutStrategy.WARN == this.taskTimeoutParameter.getStrategy()) {
return;
}
logger.info("process id:{} name:{} task id:{} name:{} cancel because of timeout.",
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
this.cancel = true;
}
/**
* check task remain time valid
*/
protected boolean checkTaskTimeout() {
if (!checkTimeoutFlag || taskInstance.getStartTime() == null) {
return false;
}
long remainTime = getRemainTime(taskTimeoutParameter.getInterval() * 60L);
return remainTime <= 0;
}
/**
* get remain time
*
* @return remain time
*/
protected long getRemainTime(long timeoutSeconds) {
Date startTime = taskInstance.getStartTime();
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000;
return timeoutSeconds - usedTime;
}
protected String getThreadName() {
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
return String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
}
}
...@@ -24,25 +24,28 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; ...@@ -24,25 +24,28 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
* master scheduler thread * master scheduler thread
*/ */
@Service @Service
public class MasterSchedulerService extends Thread { public class MasterSchedulerService extends Thread {
...@@ -77,30 +80,46 @@ public class MasterSchedulerService extends Thread { ...@@ -77,30 +80,46 @@ public class MasterSchedulerService extends Thread {
private ProcessAlertManager processAlertManager; private ProcessAlertManager processAlertManager;
/** /**
* netty remoting client * netty remoting client
*/ */
private NettyRemotingClient nettyRemotingClient; private NettyRemotingClient nettyRemotingClient;
@Autowired
NettyExecutorManager nettyExecutorManager;
/** /**
* master exec service * master exec service
*/ */
private ThreadPoolExecutor masterExecService; private ThreadPoolExecutor masterExecService;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
ConcurrentHashMap<Integer, ProcessInstance> processTimeoutCheckList = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
private StateWheelExecuteThread stateWheelExecuteThread;
/** /**
* constructor of MasterSchedulerService * constructor of MasterSchedulerService
*/ */
@PostConstruct public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
public void init() { this.processInstanceExecMaps = processInstanceExecMaps;
this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig(); NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList,
taskTimeoutCheckList,
this.processInstanceExecMaps,
masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
} }
@Override @Override
public synchronized void start() { public synchronized void start() {
super.setName("MasterSchedulerService"); super.setName("MasterSchedulerService");
super.start(); super.start();
this.stateWheelExecuteThread.start();
} }
public void close() { public void close() {
...@@ -131,10 +150,6 @@ public class MasterSchedulerService extends Thread { ...@@ -131,10 +150,6 @@ public class MasterSchedulerService extends Thread {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue; continue;
} }
// todo 串行执行 为何还需要判断状态?
/* if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
scheduleProcess();
}*/
scheduleProcess(); scheduleProcess();
} catch (Exception e) { } catch (Exception e) {
logger.error("master scheduler thread error", e); logger.error("master scheduler thread error", e);
...@@ -142,43 +157,78 @@ public class MasterSchedulerService extends Thread { ...@@ -142,43 +157,78 @@ public class MasterSchedulerService extends Thread {
} }
} }
/**
* 1. get command by slot
* 2. donot handle command if slot is empty
*
* @throws Exception
*/
private void scheduleProcess() throws Exception { private void scheduleProcess() throws Exception {
try { int activeCount = masterExecService.getActiveCount();
masterRegistryClient.blockAcquireMutex(); // make sure to scan and delete command table in one transaction
Command command = findOneCommand();
int activeCount = masterExecService.getActiveCount(); if (command != null) {
// make sure to scan and delete command table in one transaction logger.info("find one command: id: {}, type: {}", command.getId(), command.getCommandType());
Command command = processService.findOneCommand(); try {
if (command != null) { ProcessInstance processInstance = processService.handleCommand(logger,
logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType()); getLocalAddress(),
this.masterConfig.getMasterExecThreads() - activeCount, command);
try { if (processInstance != null) {
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
ProcessInstance processInstance = processService.handleCommand(logger, processInstance
getLocalAddress(), , processService
this.masterConfig.getMasterExecThreads() - activeCount, command); , nettyExecutorManager
if (processInstance != null) { , processAlertManager
logger.info("start master exec thread , split DAG ..."); , masterConfig
masterExecService.execute( , taskTimeoutCheckList);
new MasterExecThread(
processInstance this.processInstanceExecMaps.put(processInstance.getId(), workflowExecuteThread);
, processService if (processInstance.getTimeout() > 0) {
, nettyRemotingClient this.processTimeoutCheckList.put(processInstance.getId(), processInstance);
, processAlertManager
, masterConfig));
} }
} catch (Exception e) { logger.info("command {} process {} start...",
logger.error("scan command error ", e); command.getId(), processInstance.getId());
processService.moveToErrorCommand(command, e.toString()); masterExecService.execute(workflowExecuteThread);
} }
} else { } catch (Exception e) {
//indicate that no command ,sleep for 1s logger.error("scan command error ", e);
Thread.sleep(Constants.SLEEP_TIME_MILLIS); processService.moveToErrorCommand(command, e.toString());
}
} else {
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
private Command findOneCommand() {
int pageNumber = 0;
Command result = null;
while (Stopper.isRunning()) {
if (ServerNodeManager.MASTER_SIZE == 0) {
return null;
}
List<Command> commandList = processService.findCommandPage(ServerNodeManager.MASTER_SIZE, pageNumber);
if (commandList.size() == 0) {
return null;
}
for (Command command : commandList) {
int slot = ServerNodeManager.getSlot();
if (ServerNodeManager.MASTER_SIZE != 0
&& command.getId() % ServerNodeManager.MASTER_SIZE == slot) {
result = command;
break;
}
}
if (result != null) {
logger.info("find command {}, slot:{} :",
result.getId(),
ServerNodeManager.getSlot());
break;
} }
} finally { pageNumber += 1;
masterRegistryClient.releaseLock();
} }
return result;
} }
private String getLocalAddress() { private String getLocalAddress() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Date;
import java.util.Set;
/**
* master task exec thread
*/
public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
* taskInstance state manager
*/
private TaskInstanceCacheManager taskInstanceCacheManager;
/**
* netty executor manager
*/
private NettyExecutorManager nettyExecutorManager;
/**
* zookeeper register center
*/
private RegistryClient registryClient;
/**
* constructor of MasterTaskExecThread
*
* @param taskInstance task instance
*/
public MasterTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
this.registryClient = RegistryClient.getInstance();
}
/**
* get task instance
*
* @return TaskInstance
*/
@Override
public TaskInstance getTaskInstance() {
return this.taskInstance;
}
/**
* whether already Killed,default false
*/
private boolean alreadyKilled = false;
/**
* submit task instance and wait complete
*
* @return true is task quit is true
*/
@Override
public Boolean submitWaitComplete() {
Boolean result = false;
this.taskInstance = submit();
if (this.taskInstance == null) {
logger.error("submit task instance to mysql and queue failed , please check and fix it");
return result;
}
if (!this.taskInstance.getState().typeIsFinished()) {
result = waitTaskQuit();
}
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
logger.info("task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(), taskInstance.getId(), processInstance.getId());
return result;
}
/**
* polling db
* <p>
* wait task quit
*
* @return true if task quit success
*/
public Boolean waitTaskQuit() {
// query new state
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
while (Stopper.isRunning()) {
try {
if (this.processInstance == null) {
logger.error("process instance not exists , master task exec thread exit");
return true;
}
// task instance add queue , waiting worker to kill
if (this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP) {
cancelTaskInstance();
}
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
pauseTask();
}
// task instance finished
if (taskInstance.getState().typeIsFinished()) {
// if task is final result , then remove taskInstance from cache
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
break;
}
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
}
// updateProcessInstance task instance
//issue#5539 Check status of taskInstance from cache
taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("exception", e);
if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
}
return true;
}
/**
* pause task if task have not been dispatched to worker, do not dispatch anymore.
*/
public void pauseTask() {
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
if (taskInstance == null) {
return;
}
if (StringUtils.isBlank(taskInstance.getHost())) {
taskInstance.setState(ExecutionStatus.PAUSE);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}
}
/**
* task instance add queue , waiting worker to kill
*/
private void cancelTaskInstance() throws Exception {
if (alreadyKilled) {
return;
}
alreadyKilled = true;
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
if (StringUtils.isBlank(taskInstance.getHost())) {
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
return;
}
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext);
logger.info("master kill taskInstance name :{} taskInstance id:{}",
taskInstance.getName(), taskInstance.getId());
}
/**
* whether exists valid worker group
*
* @param taskInstanceWorkerGroup taskInstanceWorkerGroup
* @return whether exists
*/
public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup) {
Set<String> workerGroups = registryClient.getWorkerGroupDirectly();
// not worker group
if (CollectionUtils.isEmpty(workerGroups)) {
return false;
}
// has worker group , but not taskInstance assigned worker group
if (!workerGroups.contains(taskInstanceWorkerGroup)) {
return false;
}
Set<String> workers = registryClient.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup);
if (CollectionUtils.isEmpty(workers)) {
return false;
}
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.hadoop.util.ThreadUtil;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 1. timeout check wheel
* 2. dependent task check wheel
*/
public class StateWheelExecuteThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
ConcurrentHashMap<Integer, ProcessInstance> processInstanceCheckList;
ConcurrentHashMap<Integer, TaskInstance> taskInstanceCheckList;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
private int stateCheckIntervalSecs;
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstances,
ConcurrentHashMap<Integer, TaskInstance> taskInstances,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
int stateCheckIntervalSecs) {
this.processInstanceCheckList = processInstances;
this.taskInstanceCheckList = taskInstances;
this.processInstanceExecMaps = processInstanceExecMaps;
this.stateCheckIntervalSecs = stateCheckIntervalSecs;
}
@Override
public void run() {
logger.info("state wheel thread start");
while (Stopper.isRunning()) {
try {
checkProcess();
checkTask();
} catch (Exception e) {
logger.error("state wheel thread check error:", e);
}
ThreadUtil.sleepAtLeastIgnoreInterrupts(stateCheckIntervalSecs);
}
}
public boolean addProcess(ProcessInstance processInstance) {
this.processInstanceCheckList.put(processInstance.getId(), processInstance);
return true;
}
public boolean addTask(TaskInstance taskInstance) {
this.taskInstanceCheckList.put(taskInstance.getId(), taskInstance);
return true;
}
private void checkTask() {
if (taskInstanceCheckList.isEmpty()) {
return;
}
for (TaskInstance taskInstance : this.taskInstanceCheckList.values()) {
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 <= timeRemain && processTimeout(taskInstance)) {
taskInstanceCheckList.remove(taskInstance);
return;
}
}
if (taskInstance.isSubProcess() || taskInstance.isDependTask()) {
processDependCheck(taskInstance);
}
}
}
private void checkProcess() {
if (processInstanceCheckList.isEmpty()) {
return;
}
for (ProcessInstance processInstance : this.processInstanceCheckList.values()) {
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 <= timeRemain && processTimeout(processInstance)) {
processInstanceCheckList.remove(processInstance);
}
}
}
private void putEvent(StateEvent stateEvent) {
if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) {
return;
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent);
}
private boolean processDependCheck(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
putEvent(stateEvent);
return true;
}
private boolean processTimeout(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_TIMEOUT);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
putEvent(stateEvent);
return true;
}
private boolean processTimeout(ProcessInstance processInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.PROCESS_TIMEOUT);
stateEvent.setProcessInstanceId(processInstance.getId());
putEvent(stateEvent);
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Date;
/**
* subflow task exec thread
*/
public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
/**
* sub process instance
*/
private ProcessInstance subProcessInstance;
/**
* sub process task exec thread
* @param taskInstance task instance
*/
public SubProcessTaskExecThread(TaskInstance taskInstance){
super(taskInstance);
}
@Override
public Boolean submitWaitComplete() {
Boolean result = false;
try{
// submit task instance
this.taskInstance = submit();
if(taskInstance == null){
logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it");
return result;
}
setTaskInstanceState();
waitTaskQuit();
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
// at the end of the subflow , the task state is changed to the subflow state
if(subProcessInstance != null){
if(subProcessInstance.getState() == ExecutionStatus.STOP){
this.taskInstance.setState(ExecutionStatus.KILL);
}else{
this.taskInstance.setState(subProcessInstance.getState());
}
}
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
result = true;
}catch (Exception e){
logger.error("exception: ",e);
if (null != taskInstance) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
return result;
}
/**
* set task instance state
* @return
*/
private boolean setTaskInstanceState(){
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
if(subProcessInstance == null || taskInstance.getState().typeIsFinished()){
return false;
}
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
return true;
}
/**
* updateProcessInstance parent state
*/
private void updateParentProcessState(){
ProcessInstance parentProcessInstance = processService.findProcessInstanceById(this.processInstance.getId());
if(parentProcessInstance == null){
logger.error("parent work flow instance is null , please check it! work flow id {}", processInstance.getId());
return;
}
this.processInstance.setState(parentProcessInstance.getState());
}
/**
* wait task quit
* @throws InterruptedException
*/
private void waitTaskQuit() throws InterruptedException {
logger.info("wait sub work flow: {} complete", this.taskInstance.getName());
if (taskInstance.getState().typeIsFinished()) {
logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}",
this.taskInstance.getName(),
this.taskInstance.getState(),
this.processInstance.getState());
return;
}
while (Stopper.isRunning()) {
// waiting for subflow process instance establishment
if (subProcessInstance == null) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
if(!setTaskInstanceState()){
continue;
}
}
subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
handleTimeoutFailed();
}
updateParentProcessState();
if (subProcessInstance.getState().typeIsFinished()){
break;
}
if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){
// parent process "ready to pause" , child process "pause"
pauseSubProcess();
}else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
// parent Process "Ready to Cancel" , subflow "Cancel"
stopSubProcess();
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
/**
* stop sub process
*/
private void stopSubProcess() {
if(subProcessInstance.getState() == ExecutionStatus.STOP ||
subProcessInstance.getState() == ExecutionStatus.READY_STOP){
return;
}
subProcessInstance.setState(ExecutionStatus.READY_STOP);
processService.updateProcessInstance(subProcessInstance);
}
/**
* pause sub process
*/
private void pauseSubProcess() {
if(subProcessInstance.getState() == ExecutionStatus.PAUSE ||
subProcessInstance.getState() == ExecutionStatus.READY_PAUSE){
return;
}
subProcessInstance.setState(ExecutionStatus.READY_PAUSE);
processService.updateProcessInstance(subProcessInstance);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseTaskProcessor implements ITaskProcessor {
protected Logger logger = LoggerFactory.getLogger(getClass());
protected boolean killed = false;
protected boolean paused = false;
protected boolean timeout = false;
protected TaskInstance taskInstance = null;
protected ProcessInstance processInstance;
/**
* pause task, common tasks donot need this.
*
* @return
*/
protected abstract boolean pauseTask();
/**
* kill task, all tasks need to realize this function
*
* @return
*/
protected abstract boolean killTask();
/**
* task timeout process
* @return
*/
protected abstract boolean taskTimeout();
@Override
public void run() {
}
@Override
public boolean action(TaskAction taskAction) {
switch (taskAction) {
case STOP:
return stop();
case PAUSE:
return pause();
case TIMEOUT:
return timeout();
default:
logger.error("unknown task action: {}", taskAction.toString());
}
return false;
}
protected boolean timeout() {
if (timeout) {
return true;
}
timeout = taskTimeout();
return timeout;
}
/**
* @return
*/
protected boolean pause() {
if (paused) {
return true;
}
paused = pauseTask();
return paused;
}
protected boolean stop() {
if (killed) {
return true;
}
killed = killTask();
return killed;
}
@Override
public String getType() {
return null;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
public class CommonTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return Constants.COMMON_TASK_TYPE;
}
@Override
public ITaskProcessor create() {
return new CommonTaskProcessor();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.logging.log4j.util.Strings;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* common task processor
*/
public class CommonTaskProcessor extends BaseTaskProcessor {
@Autowired
private TaskPriorityQueue taskUpdateQueue;
@Autowired
MasterConfig masterConfig;
@Autowired
NettyExecutorManager nettyExecutorManager;
/**
* logger of MasterBaseTaskExecThread
*/
protected Logger logger = LoggerFactory.getLogger(getClass());
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
dispatchTask(taskInstance, processInstance);
return true;
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
}
@Override
public void run() {
}
@Override
protected boolean taskTimeout() {
return true;
}
/**
* common task cannot be paused
*
* @return
*/
@Override
protected boolean pauseTask() {
return true;
}
@Override
public String getType() {
return Constants.COMMON_TASK_TYPE;
}
private boolean dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance) {
try {
if (taskUpdateQueue == null) {
this.initQueue();
}
if (taskInstance.getState().typeIsFinished()) {
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
}
// task cannot be submitted because its execution state is RUNNING or DELAY.
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
|| taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.info("task ready to submit: {}", taskInstance);
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
return true;
} catch (Exception e) {
logger.error("submit task Exception: ", e);
logger.error("task error : %s", JSONUtils.toJsonString(taskInstance));
return false;
}
}
public void initQueue() {
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
}
@Override
public boolean killTask() {
try {
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
if (taskInstance == null) {
return true;
}
if (taskInstance.getState().typeIsFinished()) {
return true;
}
if (Strings.isBlank(taskInstance.getHost())) {
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
return true;
}
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext);
} catch (ExecuteException e) {
logger.error("kill task error:", e);
return false;
}
logger.info("master kill taskInstance name :{} taskInstance id:{}",
taskInstance.getName(), taskInstance.getId());
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
public class ConditionTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.CONDITIONS.getDesc();
}
@Override
public ITaskProcessor create() {
return new ConditionTaskProcessor();
}
}
...@@ -14,19 +14,27 @@ ...@@ -14,19 +14,27 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.runner;
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DependentItem; import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
...@@ -36,55 +44,121 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -36,55 +44,121 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { /**
* condition task processor
*/
public class ConditionTaskProcessor extends BaseTaskProcessor {
/** /**
* dependent parameters * dependent parameters
*/ */
private DependentParameters dependentParameters; private DependentParameters dependentParameters;
/** ProcessInstance processInstance;
* complete task map
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
/** /**
* condition result * condition result
*/ */
private DependResult conditionResult; private DependResult conditionResult = DependResult.WAITING;
/** /**
* constructor of MasterBaseTaskExecThread * complete task map
*
* @param taskInstance task instance
*/ */
public ConditionsTaskExecThread(TaskInstance taskInstance) { private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
super(taskInstance);
taskInstance.setStartTime(new Date()); protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
private TaskDefinition taskDefinition;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;
}
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
logger.info("dependent task start");
endTask();
return true;
} }
@Override @Override
public Boolean submitWaitComplete() { public ExecutionStatus taskState() {
try { return this.taskInstance.getState();
this.taskInstance = submit(); }
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(), @Override
processInstance.getProcessDefinitionVersion(), public void run() {
taskInstance.getProcessInstanceId(), if (conditionResult.equals(DependResult.WAITING)) {
taskInstance.getId())); setConditionResult();
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); } else {
Thread.currentThread().setName(threadLoggerInfoName); endTask();
initTaskParameters(); }
logger.info("dependent task start"); }
waitTaskQuit();
updateTaskState(); @Override
} catch (Exception e) { protected boolean pauseTask() {
logger.error("conditions task run exception", e); this.taskInstance.setState(ExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}
@Override
protected boolean taskTimeout() {
TaskTimeoutStrategy taskTimeoutStrategy =
taskDefinition.getTimeoutNotifyStrategy();
if (taskTimeoutStrategy == TaskTimeoutStrategy.WARN) {
return true;
} }
logger.info("condition task {} timeout, strategy {} ",
taskInstance.getId(), taskTimeoutStrategy.getDescp());
conditionResult = DependResult.FAILED;
endTask();
return true;
}
@Override
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true; return true;
} }
private void waitTaskQuit() { @Override
public String getType() {
return TaskType.CONDITIONS.getDesc();
}
private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
}
private void setConditionResult() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId()); List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for (TaskInstance task : taskInstances) { for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getName(), task.getState()); completeTaskList.putIfAbsent(task.getName(), task.getState());
...@@ -103,32 +177,6 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { ...@@ -103,32 +177,6 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
logger.info("the conditions task depend result : {}", conditionResult); logger.info("the conditions task depend result : {}", conditionResult);
} }
/**
*
*/
private void updateTaskState() {
ExecutionStatus status;
if (this.cancel) {
status = ExecutionStatus.KILL;
} else {
status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}
private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
}
/** /**
* depend result for depend item * depend result for depend item
...@@ -151,4 +199,13 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { ...@@ -151,4 +199,13 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
return dependResult; return dependResult;
} }
/**
*
*/
private void endTask() {
ExecutionStatus status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
public class DependentTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.DEPENDENT.getDesc();
}
@Override
public ITaskProcessor create() {
return new DependentTaskProcessor();
}
}
...@@ -15,22 +15,26 @@ ...@@ -15,22 +15,26 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner.task;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
...@@ -38,11 +42,12 @@ import java.util.HashMap; ...@@ -38,11 +42,12 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
public class DependentTaskExecThread extends MasterBaseTaskExecThread { /**
* dependent task processor
*/
public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters; private DependentParameters dependentParameters;
...@@ -57,43 +62,74 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { ...@@ -57,43 +62,74 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
*/ */
private Map<String, DependResult> dependResultMap = new HashMap<>(); private Map<String, DependResult> dependResultMap = new HashMap<>();
/** /**
* dependent date * dependent date
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date dependentDate; private Date dependentDate;
/** DependResult result;
* constructor of MasterBaseTaskExecThread
* ProcessInstance processInstance;
* @param taskInstance task instance TaskDefinition taskDefinition;
*/
public DependentTaskExecThread(TaskInstance taskInstance) { protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
super(taskInstance); MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
boolean allDependentItemFinished;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = task;
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;
}
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
initDependParameters();
return true;
} }
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
}
@Override
public void run() {
if (!allDependentItemFinished) {
allDependentItemFinished = allDependentTaskFinish();
}
if (allDependentItemFinished) {
getTaskDependResult();
endTask();
}
}
@Override @Override
public Boolean submitWaitComplete() { protected boolean taskTimeout() {
try { TaskTimeoutStrategy taskTimeoutStrategy =
logger.info("dependent task start"); taskDefinition.getTimeoutNotifyStrategy();
this.taskInstance = submit(); if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, && TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) {
processInstance.getProcessDefinitionCode(), return true;
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
initDependParameters();
waitTaskQuit();
updateTaskState();
} catch (Exception e) {
logger.error("dependent task run exception", e);
} }
logger.info("dependent task {} timeout, strategy {} ",
taskInstance.getId(), taskTimeoutStrategy.getDescp());
result = DependResult.FAILED;
endTask();
return true; return true;
} }
...@@ -105,89 +141,27 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { ...@@ -105,89 +141,27 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) { for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation())); this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
} }
if (this.processInstance.getScheduleTime() != null) { if (processInstance.getScheduleTime() != null) {
this.dependentDate = this.processInstance.getScheduleTime(); this.dependentDate = this.processInstance.getScheduleTime();
} else { } else {
this.dependentDate = new Date(); this.dependentDate = new Date();
} }
} }
/** @Override
* protected boolean pauseTask() {
*/ this.taskInstance.setState(ExecutionStatus.PAUSE);
private void updateTaskState() { this.taskInstance.setEndTime(new Date());
ExecutionStatus status;
if (this.cancel) {
status = ExecutionStatus.KILL;
} else {
DependResult result = getTaskDependResult();
status = (result == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance); processService.saveTaskInstance(taskInstance);
}
/**
* wait dependent tasks quit
*/
private Boolean waitTaskQuit() {
logger.info("wait depend task : {} complete", this.taskInstance.getName());
if (taskInstance.getState().typeIsFinished()) {
logger.info("task {} already complete. task state:{}",
this.taskInstance.getName(),
this.taskInstance.getState());
return true;
}
while (Stopper.isRunning()) {
try {
if (this.processInstance == null) {
logger.error("process instance not exists , master task exec thread exit");
return true;
}
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
handleTimeoutFailed();
}
if (this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP) {
cancelTaskInstance();
break;
}
if (allDependentTaskFinish() || taskInstance.getState().typeIsFinished()) {
break;
}
// update process task
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("exception", e);
if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
}
return true; return true;
} }
/** @Override
* cancel dependent task protected boolean killTask() {
*/ this.taskInstance.setState(ExecutionStatus.KILL);
private void cancelTaskInstance() { this.taskInstance.setEndTime(new Date());
this.cancel = true; processService.saveTaskInstance(taskInstance);
} return true;
private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
} }
/** /**
...@@ -223,8 +197,24 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { ...@@ -223,8 +197,24 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate); DependResult dependResult = dependentExecute.getModelDependResult(dependentDate);
dependResultList.add(dependResult); dependResultList.add(dependResult);
} }
DependResult result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList); result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList);
logger.info("dependent task completed, dependent result:{}", result); logger.info("dependent task completed, dependent result:{}", result);
return result; return result;
} }
}
\ No newline at end of file /**
*
*/
private void endTask() {
ExecutionStatus status;
status = (result == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
}
@Override
public String getType() {
return TaskType.DEPENDENT.getDesc();
}
}
...@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig; ...@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor; import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
...@@ -124,6 +125,7 @@ public class WorkerServer implements IStoppable { ...@@ -124,6 +125,7 @@ public class WorkerServer implements IStoppable {
serverConfig.setListenPort(workerConfig.getListenPort()); serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService)); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService));
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUST, new HostUpdateProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册