未验证 提交 d6798c6d 编写于 作者: Q qiaozhanwei 提交者: GitHub

1,task status statistics and process status statistics bug fix (#2357) ...

1,task status statistics and process status statistics bug fix (#2357)  2,worker group bug fix (#2430)

* dispatch task fail will set task status failed

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix

* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>
上级 8b4eb20b
......@@ -106,14 +106,12 @@ public class DataAnalysisService extends BaseService{
List<ExecuteStatusCount> taskInstanceStateCounts =
taskInstanceMapper.countTaskInstanceStateByUser(start, end, projectIds);
if (taskInstanceStateCounts != null && !taskInstanceStateCounts.isEmpty()) {
if (taskInstanceStateCounts != null) {
TaskCountDto taskCountResult = new TaskCountDto(taskInstanceStateCounts);
result.put(Constants.DATA_LIST, taskCountResult);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.TASK_INSTANCE_STATE_COUNT_ERROR);
}
return result;
return result;
}
private void putErrorRequestParamsMsg(Map<String, Object> result) {
......@@ -153,14 +151,12 @@ public class DataAnalysisService extends BaseService{
processInstanceMapper.countInstanceStateByUser(start, end,
projectIdArray);
if (processInstanceStateCounts != null && !processInstanceStateCounts.isEmpty()) {
if (processInstanceStateCounts != null) {
TaskCountDto taskCountResult = new TaskCountDto(processInstanceStateCounts);
result.put(Constants.DATA_LIST, taskCountResult);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.COUNT_PROCESS_INSTANCE_STATE_ERROR);
}
return result;
return result;
}
......@@ -234,7 +230,7 @@ public class DataAnalysisService extends BaseService{
// count error command state
List<CommandCount> errorCommandStateCounts =
errorCommandMapper.countCommandState(
start, end, projectIdArray);
start, end, projectIdArray);
//
Map<CommandType,Map<String,Integer>> dataMap = new HashMap<>();
......
......@@ -114,9 +114,6 @@ public class DataAnalysisServiceTest {
Map<String, Object> result = dataAnalysisService.countTaskStateByProject(user, 2, startDate, endDate);
Assert.assertTrue(result.isEmpty());
// task instance state count error
result = dataAnalysisService.countTaskStateByProject(user, 1, startDate, endDate);
Assert.assertEquals(Status.TASK_INSTANCE_STATE_COUNT_ERROR,result.get(Constants.STATUS));
//SUCCESS
Mockito.when(taskInstanceMapper.countTaskInstanceStateByUser(DateUtils.getScheduleDate(startDate),
......@@ -137,10 +134,6 @@ public class DataAnalysisServiceTest {
Map<String, Object> result = dataAnalysisService.countProcessInstanceStateByProject(user,2,startDate,endDate);
Assert.assertTrue(result.isEmpty());
//COUNT_PROCESS_INSTANCE_STATE_ERROR
result = dataAnalysisService.countProcessInstanceStateByProject(user,1,startDate,endDate);
Assert.assertEquals(Status.COUNT_PROCESS_INSTANCE_STATE_ERROR,result.get(Constants.STATUS));
//SUCCESS
Mockito.when(processInstanceMapper.countInstanceStateByUser(DateUtils.getScheduleDate(startDate),
DateUtils.getScheduleDate(endDate), new Integer[]{1})).thenReturn(getTaskInstanceStateCounts());
......
......@@ -223,4 +223,14 @@ public class ThreadUtils {
}
return id + " (" + name + ")";
}
/**
* sleep
* @param millis millis
*/
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ignore) {}
}
}
......@@ -111,7 +111,7 @@ public class Command {
/**
* worker group
*/
@TableField(exist = false)
@TableField("worker_group")
private String workerGroup;
public Command() {
......
......@@ -76,7 +76,8 @@ public class CommandMapperTest {
//query
Command actualCommand = commandMapper.selectById(expectedCommand.getId());
assertEquals(expectedCommand, actualCommand);
assertNotNull(actualCommand);
assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId());
}
/**
......@@ -94,7 +95,8 @@ public class CommandMapperTest {
Command actualCommand = commandMapper.selectById(expectedCommand.getId());
assertEquals(expectedCommand,actualCommand);
assertNotNull(actualCommand);
assertEquals(expectedCommand.getUpdateTime(),actualCommand.getUpdateTime());
}
......@@ -127,13 +129,6 @@ public class CommandMapperTest {
List<Command> actualCommands = commandMapper.selectList(null);
assertThat(actualCommands.size(), greaterThanOrEqualTo(count));
for (Command actualCommand : actualCommands){
Command expectedCommand = commandMap.get(actualCommand.getId());
if (expectedCommand != null){
assertEquals(expectedCommand,actualCommand);
}
}
}
/**
......
......@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
......@@ -122,9 +123,7 @@ public class TaskPriorityQueueConsumer extends Thread{
result = dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("dispatch error",e);
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e1) {}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
if (result){
......
......@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
......@@ -101,10 +102,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
if (taskInstance != null && ackStatus.typeIsRunning()){
break;
}
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
......@@ -99,10 +100,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
if (taskInstance != null && responseStatus.typeIsFinished()){
break;
}
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
}
......
......@@ -18,11 +18,11 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
......@@ -37,6 +37,8 @@ import org.springframework.stereotype.Service;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
* taks callback service
*/
......@@ -98,11 +100,7 @@ public class TaskCallbackService {
while (Stopper.isRunning()) {
if (CollectionUtils.isEmpty(masterNodes)) {
logger.error("no available master node");
try {
Thread.sleep(1000);
}catch (Exception e){
}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}else {
break;
}
......
......@@ -234,7 +234,7 @@ CREATE TABLE t_ds_command (
dependence varchar(255) DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
worker_group varchar(64),
PRIMARY KEY (id)
) ;
......@@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command (
update_time timestamp DEFAULT NULL ,
dependence text ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
worker_group varchar(64),
message text ,
PRIMARY KEY (id)
);
......@@ -748,7 +748,7 @@ CREATE SEQUENCE t_ds_worker_server_id_sequence;
ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence');
-- Records of t_ds_useruser : admin , password : dolphinscheduler123
-- Records of t_ds_user?user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup,dolphinscheduler warning group
......
......@@ -333,7 +333,7 @@ CREATE TABLE `t_ds_command` (
`dependence` varchar(255) DEFAULT NULL COMMENT 'dependence',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id',
`worker_group` varchar(64) COMMENT 'worker group',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
......@@ -380,7 +380,7 @@ CREATE TABLE `t_ds_error_command` (
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`dependence` text COMMENT 'dependence',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority, 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id',
`worker_group` varchar(64) COMMENT 'worker group',
`message` text COMMENT 'message',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册