未验证 提交 2f010258 编写于 作者: F felix.wang 提交者: GitHub

[Fix-3299][dao&server] Fix 3299,when Json string parsing problem caused by...

[Fix-3299][dao&server] Fix 3299,when Json string parsing problem caused by non-standard json format. (#3552)

* #3299  Json string parsing problem caused by non-standard json format.

* #3299  Json string parsing problem caused by non-standard json format.

* #3299  Json string parsing problem caused by non-standard json format. fix  code style

* #3299  Json string parsing problem caused by non-standard json format. fix  code style
Co-authored-by: Nwangjianda <Felix@thinkingdata.com>
上级 d30bc7bc
......@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao;
package org.apache.dolphinscheduler.dao;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
......@@ -30,13 +29,17 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class AlertDao extends AbstractBaseDao {
......@@ -56,21 +59,23 @@ public class AlertDao extends AbstractBaseDao {
/**
* insert alert
*
* @param alert alert
* @return add alert result
*/
public int addAlert(Alert alert){
public int addAlert(Alert alert) {
return alertMapper.insert(alert);
}
/**
* update alert
*
* @param alertStatus alertStatus
* @param log log
* @param id id
* @return update alert result
*/
public int updateAlert(AlertStatus alertStatus,String log,int id){
public int updateAlert(AlertStatus alertStatus, String log, int id) {
Alert alert = alertMapper.selectById(id);
alert.setAlertStatus(alertStatus);
alert.setUpdateTime(new Date());
......@@ -80,46 +85,61 @@ public class AlertDao extends AbstractBaseDao {
/**
* query user list by alert group id
*
* @param alerGroupId alerGroupId
* @return user list
*/
public List<User> queryUserByAlertGroupId(int alerGroupId){
public List<User> queryUserByAlertGroupId(int alerGroupId) {
return userAlertGroupMapper.listUserByAlertgroupId(alerGroupId);
}
/**
* MasterServer or WorkerServer stoped
*
* @param alertgroupId alertgroupId
* @param host host
* @param serverType serverType
*/
public void sendServerStopedAlert(int alertgroupId,String host,String serverType){
public void sendServerStopedAlert(int alertgroupId, String host, String serverType) {
Alert alert = new Alert();
String content = String.format("[{'type':'%s','host':'%s','event':'server down','warning level':'serious'}]",
serverType, host);
List<LinkedHashMap> serverStopList = new ArrayList<>(1);
LinkedHashMap<String, String> serverStopedMap = new LinkedHashMap();
serverStopedMap.put("type", serverType);
serverStopedMap.put("host", host);
serverStopedMap.put("event", "server down");
serverStopedMap.put("warning level", "serious");
serverStopList.add(serverStopedMap);
String content = JSONUtils.toJsonString(serverStopList);
alert.setTitle("Fault tolerance warning");
saveTaskTimeoutAlert(alert, content, alertgroupId, null, null);
}
/**
* process time out alert
*
* @param processInstance processInstance
* @param processDefinition processDefinition
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition){
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {
int alertgroupId = processInstance.getWarningGroupId();
String receivers = processDefinition.getReceivers();
String receiversCc = processDefinition.getReceiversCc();
Alert alert = new Alert();
String content = String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]",
processInstance.getId(), processInstance.getName());
List<LinkedHashMap> processTimeoutList = new ArrayList<>(1);
LinkedHashMap<String, String> processTimeoutMap = new LinkedHashMap();
processTimeoutMap.put("id", String.valueOf(processInstance.getId()));
processTimeoutMap.put("name", processInstance.getName());
processTimeoutMap.put("event", "timeout");
processTimeoutMap.put("warnLevel", "middle");
processTimeoutList.add(processTimeoutMap);
String content = JSONUtils.toJsonString(processTimeoutList);
alert.setTitle("Process Timeout Warn");
saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, receiversCc);
}
private void saveTaskTimeoutAlert(Alert alert, String content, int alertgroupId,
String receivers, String receiversCc){
private void saveTaskTimeoutAlert(Alert alert, String content, int alertgroupId,
String receivers, String receiversCc) {
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
......@@ -135,9 +155,9 @@ public class AlertDao extends AbstractBaseDao {
alertMapper.insert(alert);
}
/**
* task timeout warn
*
* @param alertgroupId alertgroupId
* @param receivers receivers
* @param receiversCc receiversCc
......@@ -146,34 +166,45 @@ public class AlertDao extends AbstractBaseDao {
* @param taskId taskId
* @param taskName taskName
*/
public void sendTaskTimeoutAlert(int alertgroupId,String receivers,String receiversCc, int processInstanceId,
String processInstanceName, int taskId,String taskName){
public void sendTaskTimeoutAlert(int alertgroupId, String receivers, String receiversCc, int processInstanceId,
String processInstanceName, int taskId, String taskName) {
Alert alert = new Alert();
String content = String.format("[{'process instance id':'%d','task name':'%s','task id':'%d','task name':'%s'," +
"'event':'timeout','warnLevel':'middle'}]", processInstanceId, processInstanceName, taskId, taskName);
List<LinkedHashMap> taskTimeoutList = new ArrayList<>(1);
LinkedHashMap<String, String> taskTimeoutMap = new LinkedHashMap();
taskTimeoutMap.put("process instance id", String.valueOf(processInstanceId));
taskTimeoutMap.put("process name", processInstanceName);
taskTimeoutMap.put("task id", String.valueOf(taskId));
taskTimeoutMap.put("task name", taskName);
taskTimeoutMap.put("event", "timeout");
taskTimeoutMap.put("warnLevel", "middle");
taskTimeoutList.add(taskTimeoutMap);
String content = JSONUtils.toJsonString(taskTimeoutList);
alert.setTitle("Task Timeout Warn");
saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, receiversCc);
}
/**
* list the alert information of waiting to be executed
*
* @return alert list
*/
public List<Alert> listWaitExecutionAlert(){
public List<Alert> listWaitExecutionAlert() {
return alertMapper.listAlertByStatus(AlertStatus.WAIT_EXECUTION);
}
/**
* list user information by alert group id
*
* @param alertgroupId alertgroupId
* @return user list
*/
public List<User> listUserByAlertgroupId(int alertgroupId){
public List<User> listUserByAlertgroupId(int alertgroupId) {
return userAlertGroupMapper.listUserByAlertgroupId(alertgroupId);
}
/**
* for test
*
* @return AlertMapper
*/
public AlertMapper getAlertMapper() {
......
......@@ -14,29 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* alert manager
*/
......@@ -50,8 +51,7 @@ public class AlertManager {
/**
* alert dao
*/
private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
/**
* command type convert chinese
......@@ -86,50 +86,37 @@ public class AlertManager {
}
}
/**
* process instance format
*/
private static final String PROCESS_INSTANCE_FORMAT =
"\"id:%d\"," +
"\"name:%s\"," +
"\"job type: %s\"," +
"\"state: %s\"," +
"\"recovery:%s\"," +
"\"run time: %d\"," +
"\"start time: %s\"," +
"\"end time: %s\"," +
"\"host: %s\"" ;
/**
* get process instance content
* @param processInstance process instance
* @param taskInstances task instance list
*
* @param processInstance process instance
* @param taskInstances task instance list
* @return process instance format content
*/
public String getContentProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances){
List<TaskInstance> taskInstances) {
String res = "";
if(processInstance.getState().typeIsSuccess()){
res = String.format(PROCESS_INSTANCE_FORMAT,
processInstance.getId(),
processInstance.getName(),
getCommandCnName(processInstance.getCommandType()),
processInstance.getState().toString(),
processInstance.getRecovery().toString(),
processInstance.getRunTimes(),
DateUtils.dateToString(processInstance.getStartTime()),
DateUtils.dateToString(processInstance.getEndTime()),
processInstance.getHost()
);
res = "[" + res + "]";
}else if(processInstance.getState().typeIsFailure()){
if (processInstance.getState().typeIsSuccess()) {
List<LinkedHashMap> successTaskList = new ArrayList<>(1);
LinkedHashMap<String, String> successTaskMap = new LinkedHashMap();
successTaskMap.put("id", String.valueOf(processInstance.getId()));
successTaskMap.put("name", processInstance.getName());
successTaskMap.put("job type", getCommandCnName(processInstance.getCommandType()));
successTaskMap.put("state", processInstance.getState().toString());
successTaskMap.put("recovery", processInstance.getRecovery().toString());
successTaskMap.put("run time", String.valueOf(processInstance.getRunTimes()));
successTaskMap.put("start time", DateUtils.dateToString(processInstance.getStartTime()));
successTaskMap.put("end time", DateUtils.dateToString(processInstance.getEndTime()));
successTaskMap.put("host", processInstance.getHost());
successTaskList.add(successTaskMap);
res = JSONUtils.toJsonString(successTaskList);
} else if (processInstance.getState().typeIsFailure()) {
List<LinkedHashMap> failedTaskList = new ArrayList<>();
for(TaskInstance task : taskInstances){
if(task.getState().typeIsSuccess()){
for (TaskInstance task : taskInstances) {
if (task.getState().typeIsSuccess()) {
continue;
}
LinkedHashMap<String, String> failedTaskMap = new LinkedHashMap();
......@@ -154,15 +141,15 @@ public class AlertManager {
/**
* getting worker fault tolerant content
*
* @param processInstance process instance
* @param processInstance process instance
* @param toleranceTaskList tolerance task list
* @return worker tolerance content
*/
private String getWorkerToleranceContent(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
private String getWorkerToleranceContent(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList) {
List<LinkedHashMap<String, String>> toleranceTaskInstanceList = new ArrayList<>();
List<LinkedHashMap<String, String>> toleranceTaskInstanceList = new ArrayList<>();
for(TaskInstance taskInstance: toleranceTaskList){
for (TaskInstance taskInstance : toleranceTaskList) {
LinkedHashMap<String, String> toleranceWorkerContentMap = new LinkedHashMap();
toleranceWorkerContentMap.put("process name", processInstance.getName());
toleranceWorkerContentMap.put("task name", taskInstance.getName());
......@@ -176,11 +163,11 @@ public class AlertManager {
/**
* send worker alert fault tolerance
*
* @param processInstance process instance
* @param processInstance process instance
* @param toleranceTaskList tolerance task list
*/
public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
try{
public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList) {
try {
Alert alert = new Alert();
alert.setTitle("worker fault tolerance");
alert.setShowType(ShowType.TABLE);
......@@ -188,13 +175,13 @@ public class AlertManager {
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setCreateTime(new Date());
alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId());
alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
alert.setReceivers(processInstance.getProcessDefinition().getReceivers());
alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc());
alertDao.addAlert(alert);
logger.info("add alert to db , alert : {}", alert.toString());
}catch (Exception e){
} catch (Exception e) {
logger.error("send alert failed:{} ", e.getMessage());
}
......@@ -202,40 +189,40 @@ public class AlertManager {
/**
* send process instance alert
* @param processInstance process instance
* @param taskInstances task instance list
*
* @param processInstance process instance
* @param taskInstances task instance list
*/
public void sendAlertProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances){
List<TaskInstance> taskInstances) {
boolean sendWarnning = false;
WarningType warningType = processInstance.getWarningType();
switch (warningType){
switch (warningType) {
case ALL:
if(processInstance.getState().typeIsFinished()){
if (processInstance.getState().typeIsFinished()) {
sendWarnning = true;
}
break;
case SUCCESS:
if(processInstance.getState().typeIsSuccess()){
if (processInstance.getState().typeIsSuccess()) {
sendWarnning = true;
}
break;
case FAILURE:
if(processInstance.getState().typeIsFailure()){
if (processInstance.getState().typeIsFailure()) {
sendWarnning = true;
}
break;
default:
default:
}
if(!sendWarnning){
if (!sendWarnning) {
return;
}
Alert alert = new Alert();
String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().typeIsSuccess() ? "success" :"failed";
String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
alert.setTitle(cmdName + " " + success);
ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE;
alert.setShowType(showType);
......@@ -254,7 +241,7 @@ public class AlertManager {
/**
* send process timeout alert
*
* @param processInstance process instance
* @param processInstance process instance
* @param processDefinition process definition
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册