提交 5ab9c8d5 编写于 作者: B break60
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
name: Bug report name: Bug report
about: Create a report to help us improve about: Create a report to help us improve
title: "[Bug][Module Name] Bug title " title: "[Bug][Module Name] Bug title "
labels: bug
assignees: '' assignees: ''
--- ---
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
name: Feature request name: Feature request
about: Suggest an idea for this project about: Suggest an idea for this project
title: "[Feature][Module Name] Feature title" title: "[Feature][Module Name] Feature title"
labels: new feature
assignees: '' assignees: ''
--- ---
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
name: Improvement suggestion name: Improvement suggestion
about: Improvement suggestion for this project about: Improvement suggestion for this project
title: "[Improvement][Module Name] Improvement title" title: "[Improvement][Module Name] Improvement title"
labels: improvement
assignees: '' assignees: ''
--- ---
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
name: Question name: Question
about: Have a question wanted to be help about: Have a question wanted to be help
title: "[Question] Question title" title: "[Question] Question title"
labels: question
assignees: '' assignees: ''
--- ---
......
...@@ -7,46 +7,44 @@ Dolphin Scheduler Official Website ...@@ -7,46 +7,44 @@ Dolphin Scheduler Official Website
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=apache-dolphinscheduler&metric=alert_status)](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=apache-dolphinscheduler&metric=alert_status)](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler)
> Dolphin Scheduler for Big Data
[![Stargazers over time](https://starchart.cc/apache/incubator-dolphinscheduler.svg)](https://starchart.cc/apache/incubator-dolphinscheduler) [![Stargazers over time](https://starchart.cc/apache/incubator-dolphinscheduler.svg)](https://starchart.cc/apache/incubator-dolphinscheduler)
[![EN doc](https://img.shields.io/badge/document-English-blue.svg)](README.md) [![EN doc](https://img.shields.io/badge/document-English-blue.svg)](README.md)
[![CN doc](https://img.shields.io/badge/文档-中文版-blue.svg)](README_zh_CN.md) [![CN doc](https://img.shields.io/badge/文档-中文版-blue.svg)](README_zh_CN.md)
### Design features: ### Design Features:
Dolphin Scheduler is a distributed and easy-to-extend visual DAG workflow scheduling system. It dedicates to solving the complex dependencies in data processing to make the scheduling system `out of the box` for the data processing process. DolphinScheduler is a distributed and extensible workflow scheduler platform with powerful DAG visual interfaces, dedicated to solving complex job dependencies in the data pipeline and providing various types of jobs available `out of the box`.
Its main objectives are as follows: Its main objectives are as follows:
- Associate the tasks according to the dependencies of the tasks in a DAG graph, which can visualize the running state of the task in real-time. - Associate the tasks according to the dependencies of the tasks in a DAG graph, which can visualize the running state of the task in real-time.
- Support many task types: Shell, MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Sub_Process, Procedure, etc. - Support various task types: Shell, MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Sub_Process, Procedure, etc.
- Support process scheduling, dependency scheduling, manual scheduling, manual pause/stop/recovery, support for failed retry/alarm, recovery from specified nodes, Kill task, etc. - Support scheduling of workflows and dependencies, manual scheduling to pause/stop/recover task, support failure task retry/alarm, recover specified nodes from failure, kill task, etc.
- Support the priority of process & task, task failover, and task timeout alarm or failure. - Support the priority of workflows & tasks, task failover, and task timeout alarm or failure.
- Support process global parameters and node custom parameter settings. - Support workflow global parameters and node customized parameter settings.
- Support online upload/download of resource files, management, etc. Support online file creation and editing. - Support online upload/download/management of resource files, etc. Support online file creation and editing.
- Support task log online viewing and scrolling, online download log, etc. - Support task log online viewing and scrolling and downloading, etc.
- Implement cluster HA, decentralize Master cluster and Worker cluster through Zookeeper. - Have implemented cluster HA, decentralize Master cluster and Worker cluster through Zookeeper.
- Support the viewing of Master/Worker CPU load, memory, and CPU usage metrics. - Support the viewing of Master/Worker CPU load, memory, and CPU usage metrics.
- Support presenting tree or Gantt chart of workflow history as well as the statistics results of task & process status in each workflow. - Support displaying workflow history in tree/Gantt chart, as well as statistical analysis on the task status & process status in each workflow.
- Support backfilling data. - Support back-filling data.
- Support multi-tenant. - Support multi-tenant.
- Support internationalization. - Support internationalization.
- There are more waiting for partners to explore... - More features waiting for partners to explore...
### What's in Dolphin Scheduler ### What's in DolphinScheduler
Stability | Easy to use | Features | Scalability | Stability | Easy to use | Features | Scalability |
-- | -- | -- | -- -- | -- | -- | --
Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables, and so on at a glance.  |  Support pause, recover operation | Support custom task types Decentralized multi-master and multi-worker | Visualization of workflow key information, such as task status, task type, retry times, task operation machine information, visual variables, and so on at a glance.  |  Support pause, recover operation | Support customized task types
HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the API mode operation is provided. | Users on Dolphin Scheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline. support HA | Visualization of all workflow operations, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, provide API mode operations. | Users on DolphinScheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler supports distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic adjustment.
Overload processing: Overload processing: By using the task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured. Machine jam can be avoided with high tolerance to numbers of tasks cached in task queue. | One-click deployment | Support traditional shell tasks, and big data platform task scheduling: MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Procedure, Sub_Process | | Overload processing: By using the task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured. Machine jam can be avoided with high tolerance to numbers of tasks cached in task queue. | One-click deployment | Support traditional shell tasks, and big data platform task scheduling: MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Procedure, Sub_Process | |
### System partial screenshot ### User Interface Screenshots
![home page](https://user-images.githubusercontent.com/15833811/75218288-bf286400-57d4-11ea-8263-d639c6511d5f.jpg) ![home page](https://user-images.githubusercontent.com/15833811/75218288-bf286400-57d4-11ea-8263-d639c6511d5f.jpg)
![dag](https://user-images.githubusercontent.com/15833811/75236750-3374fe80-57f9-11ea-857d-62a66a5a559d.png) ![dag](https://user-images.githubusercontent.com/15833811/75236750-3374fe80-57f9-11ea-857d-62a66a5a559d.png)
...@@ -57,13 +55,9 @@ Overload processing: Overload processing: By using the task queue mechanism, the ...@@ -57,13 +55,9 @@ Overload processing: Overload processing: By using the task queue mechanism, the
![security](https://user-images.githubusercontent.com/15833811/75236441-bfd2f180-57f8-11ea-88bd-f24311e01b7e.png) ![security](https://user-images.githubusercontent.com/15833811/75236441-bfd2f180-57f8-11ea-88bd-f24311e01b7e.png)
![treeview](https://user-images.githubusercontent.com/15833811/75217191-3fe56100-57d1-11ea-8856-f19180d9a879.png) ![treeview](https://user-images.githubusercontent.com/15833811/75217191-3fe56100-57d1-11ea-8856-f19180d9a879.png)
### QuickStart in Docker
Please referer the official website document:[[QuickStart in Docker](https://dolphinscheduler.apache.org/en-us/docs/1.3.4/user_doc/docker-deployment.html)]
### Recent R&D plan
The work plan of Dolphin Scheduler: [R&D plan](https://github.com/apache/incubator-dolphinscheduler/projects/1), which `In Develop` card shows the features that are currently being developed and TODO card lists what needs to be done(including feature ideas).
### How to contribute
Welcome to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/docs/development/contribute.html)]
### How to Build ### How to Build
...@@ -80,14 +74,16 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${latest.release ...@@ -80,14 +74,16 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${latest.release
### Thanks ### Thanks
Dolphin Scheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on. DolphinScheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on.
We would like to express our deep gratitude to all the open-source projects which contribute to making the dream of Dolphin Scheduler comes true. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we expect the partners who have the same passion and conviction to open-source will join in and contribute to the open-source community! We would like to express our deep gratitude to all the open-source projects used in Dolphin Scheduler. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we hope everyone who have the same enthusiasm and passion for open source could join in and contribute to the open-source community!
### Get Help ### Get Help
1. Submit an issue 1. Submit an [[issue](https://github.com/apache/incubator-dolphinscheduler/issues/new/choose)]
1. Subscribe to the mail list: https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html, then email dev@dolphinscheduler.apache.org 1. Subscribe to the mail list: https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html, then email dev@dolphinscheduler.apache.org
### How to Contribute
The community welcomes everyone to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/community/development/contribute.html)]
### License ### License
Please refer to the [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) file. Please refer to the [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) file.
......
...@@ -15,53 +15,64 @@ ...@@ -15,53 +15,64 @@
~ limitations under the License. ~ limitations under the License.
--> -->
<configuration> <configuration>
<property> <property>
<name>worker.exec.threads</name> <name>worker.exec.threads</name>
<value>100</value> <value>100</value>
<value-attributes> <value-attributes>
<type>int</type> <type>int</type>
</value-attributes> </value-attributes>
<description>worker execute thread num</description> <description>worker execute thread num</description>
<on-ambari-upgrade add="true"/> <on-ambari-upgrade add="true"/>
</property> </property>
<property> <property>
<name>worker.heartbeat.interval</name> <name>worker.heartbeat.interval</name>
<value>10</value> <value>10</value>
<value-attributes> <value-attributes>
<type>int</type> <type>int</type>
</value-attributes> </value-attributes>
<description>worker heartbeat interval</description> <description>worker heartbeat interval</description>
<on-ambari-upgrade add="true"/> <on-ambari-upgrade add="true"/>
</property> </property>
<property> <property>
<name>worker.max.cpuload.avg</name> <name>worker.max.cpuload.avg</name>
<value>100</value> <value>100</value>
<value-attributes> <value-attributes>
<type>int</type> <type>int</type>
</value-attributes> </value-attributes>
<description>only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2</description> <description>only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
<on-ambari-upgrade add="true"/> </description>
</property> <on-ambari-upgrade add="true"/>
<property> </property>
<name>worker.reserved.memory</name> <property>
<value>0.3</value> <name>worker.reserved.memory</name>
<description>only larger than reserved memory, worker server can work. default value : physical memory * 1/10, unit is G.</description> <value>0.3</value>
<on-ambari-upgrade add="true"/> <description>only larger than reserved memory, worker server can work. default value : physical memory * 1/10,
</property> unit is G.
</description>
<property> <on-ambari-upgrade add="true"/>
<name>worker.listen.port</name> </property>
<value>1234</value> <property>
<value-attributes> <name>worker.listen.port</name>
<type>int</type> <value>1234</value>
</value-attributes> <value-attributes>
<description>worker listen port</description> <type>int</type>
<on-ambari-upgrade add="true"/> </value-attributes>
</property> <description>worker listen port</description>
<property> <on-ambari-upgrade add="true"/>
<name>worker.groups</name> </property>
<value>default</value> <property>
<description>default worker group</description> <name>worker.groups</name>
<on-ambari-upgrade add="true"/> <value>default</value>
</property> <description>default worker group</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.weigth</name>
<value>100</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description>worker weight</description>
<on-ambari-upgrade add="true"/>
</property>
</configuration> </configuration>
\ No newline at end of file
...@@ -22,36 +22,24 @@ package org.apache.dolphinscheduler.plugin.alert.dingtalk; ...@@ -22,36 +22,24 @@ package org.apache.dolphinscheduler.plugin.alert.dingtalk;
*/ */
public class DingTalkParamsConstants { public class DingTalkParamsConstants {
static final String DING_TALK_PROXY_ENABLE = "isEnableProxy";
static final String DING_TALK_WEB_HOOK = "webhook";
static final String NAME_DING_TALK_WEB_HOOK = "WebHook";
static final String DING_TALK_KEYWORD = "keyword";
static final String NAME_DING_TALK_KEYWORD = "Keyword";
static final String NAME_DING_TALK_PROXY_ENABLE = "IsEnableProxy";
static final String DING_TALK_PROXY = "proxy";
static final String NAME_DING_TALK_PROXY = "Proxy";
static final String DING_TALK_PORT = "port";
static final String NAME_DING_TALK_PORT = "Port";
static final String DING_TALK_USER = "user";
static final String NAME_DING_TALK_USER = "User";
static final String DING_TALK_PASSWORD = "password";
static final String NAME_DING_TALK_PASSWORD = "Password";
private DingTalkParamsConstants() { private DingTalkParamsConstants() {
throw new IllegalStateException("Utility class"); throw new IllegalStateException("Utility class");
} }
static final String DING_TALK_WEB_HOOK = "dingtalk.webhook";
static final String NAME_DING_TALK_WEB_HOOK = "dingTalkWebHook";
static final String DING_TALK_KEYWORD = "dingtalk.keyword";
static final String NAME_DING_TALK_KEYWORD = "dingTalkKeyword";
public static final String DING_TALK_PROXY_ENABLE = "dingtalk.isEnableProxy";
static final String NAME_DING_TALK_PROXY_ENABLE = "dingTalkIsEnableProxy";
static final String DING_TALK_PROXY = "dingtalk.proxy";
static final String NAME_DING_TALK_PROXY = "dingTalkProxy";
static final String DING_TALK_PORT = "dingtalk.port";
static final String NAME_DING_TALK_PORT = "dingTalkPort";
static final String DING_TALK_USER = "dingtalk.user";
static final String NAME_DING_TALK_USER = "dingTalkUser";
static final String DING_TALK_PASSWORD = "dingtalk.password";
static final String NAME_DING_TALK_PASSWORD = "dingTalkPassword";
} }
...@@ -75,51 +75,6 @@ public class DingTalkSender { ...@@ -75,51 +75,6 @@ public class DingTalkSender {
} }
public AlertResult sendDingTalkMsg(String msg, String charset) {
AlertResult alertResult;
try {
String resp = sendMsg(msg, charset);
return checkSendDingTalkSendMsgResult(resp);
} catch (Exception e) {
logger.info("send ding talk alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setMessage("send ding talk alert fail.");
}
return alertResult;
}
private String sendMsg(String msg, String charset) throws IOException {
String msgToJson = textToJsonString(msg + "#" + keyword);
HttpPost httpPost = constructHttpPost(url, msgToJson, charset);
CloseableHttpClient httpClient;
if (Boolean.TRUE.equals(enableProxy)) {
httpClient = getProxyClient(proxy, port, user, password);
RequestConfig rcf = getProxyConfig(proxy, port);
httpPost.setConfig(rcf);
} else {
httpClient = getDefaultClient();
}
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, charset);
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send {}, resp: {}", msg, resp);
return resp;
} finally {
httpClient.close();
}
}
private static HttpPost constructHttpPost(String url, String msg, String charset) { private static HttpPost constructHttpPost(String url, String msg, String charset) {
HttpPost post = new HttpPost(url); HttpPost post = new HttpPost(url);
StringEntity entity = new StringEntity(msg, charset); StringEntity entity = new StringEntity(msg, charset);
...@@ -155,27 +110,6 @@ public class DingTalkSender { ...@@ -155,27 +110,6 @@ public class DingTalkSender {
return JSONUtils.toJsonString(items); return JSONUtils.toJsonString(items);
} }
public static class DingTalkSendMsgResponse {
private Integer errcode;
private String errmsg;
public Integer getErrcode() {
return errcode;
}
public void setErrcode(Integer errcode) {
this.errcode = errcode;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
}
private static AlertResult checkSendDingTalkSendMsgResult(String result) { private static AlertResult checkSendDingTalkSendMsgResult(String result) {
AlertResult alertResult = new AlertResult(); AlertResult alertResult = new AlertResult();
alertResult.setStatus("false"); alertResult.setStatus("false");
...@@ -201,4 +135,70 @@ public class DingTalkSender { ...@@ -201,4 +135,70 @@ public class DingTalkSender {
return alertResult; return alertResult;
} }
public AlertResult sendDingTalkMsg(String title, String content) {
AlertResult alertResult;
try {
String resp = sendMsg(title, content);
return checkSendDingTalkSendMsgResult(resp);
} catch (Exception e) {
logger.info("send ding talk alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setMessage("send ding talk alert fail.");
}
return alertResult;
}
private String sendMsg(String title, String content) throws IOException {
String msgToJson = textToJsonString(title + content + "#" + keyword);
HttpPost httpPost = constructHttpPost(url, msgToJson, "UTF-8");
CloseableHttpClient httpClient;
if (Boolean.TRUE.equals(enableProxy)) {
httpClient = getProxyClient(proxy, port, user, password);
RequestConfig rcf = getProxyConfig(proxy, port);
httpPost.setConfig(rcf);
} else {
httpClient = getDefaultClient();
}
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, "UTF-8");
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send title :{},content : {}, resp: {}", title, content, resp);
return resp;
} finally {
httpClient.close();
}
}
public static class DingTalkSendMsgResponse {
private Integer errcode;
private String errmsg;
public Integer getErrcode() {
return errcode;
}
public void setErrcode(Integer errcode) {
this.errcode = errcode;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
}
} }
...@@ -50,7 +50,7 @@ public class DingTalkSenderTest { ...@@ -50,7 +50,7 @@ public class DingTalkSenderTest {
dingTalkSender.sendDingTalkMsg("keyWord+Welcome", "UTF-8"); dingTalkSender.sendDingTalkMsg("keyWord+Welcome", "UTF-8");
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE, "true"); dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE, "true");
dingTalkSender = new DingTalkSender(dingTalkConfig); dingTalkSender = new DingTalkSender(dingTalkConfig);
AlertResult alertResult = dingTalkSender.sendDingTalkMsg("keyWord+Welcome", "UTF-8"); AlertResult alertResult = dingTalkSender.sendDingTalkMsg("title", "content test");
Assert.assertEquals("false",alertResult.getStatus()); Assert.assertEquals("false",alertResult.getStatus());
} }
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-alert-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert-feishu</artifactId>
<packaging>dolphinscheduler-plugin</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>dolphinscheduler-alert-feishu-${project.version}</finalName>
</build>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import java.util.Map;
public class FeiShuAlertChannel implements AlertChannel {
@Override
public AlertResult process(AlertInfo alertInfo) {
AlertData alertData = alertInfo.getAlertData();
String alertParams = alertInfo.getAlertParams();
Map<String, String> paramsMap = PluginParamsTransfer.getPluginParamsMap(alertParams);
return new FeiShuSender(paramsMap).sendFeiShuMsg(alertData);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import org.apache.dolphinscheduler.spi.params.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
import org.apache.dolphinscheduler.spi.params.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import java.util.Arrays;
import java.util.List;
public class FeiShuAlertChannelFactory implements AlertChannelFactory {
@Override
public String getName() {
return "Feishu";
}
@Override
public List<PluginParams> getParams() {
InputParam webHookParam = InputParam.newBuilder(FeiShuParamsConstants.NAME_WEB_HOOK, FeiShuParamsConstants.WEB_HOOK)
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
RadioParam isEnableProxy =
RadioParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_PROXY_ENABLE, FeiShuParamsConstants.NAME_FEI_SHU_PROXY_ENABLE)
.addParamsOptions(new ParamsOptions("YES", true, false))
.addParamsOptions(new ParamsOptions("NO", false, false))
.setValue(true)
.addValidate(Validate.newBuilder()
.setRequired(false)
.build())
.build();
InputParam proxyParam =
InputParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_PROXY, FeiShuParamsConstants.FEI_SHU_PROXY)
.addValidate(Validate.newBuilder()
.setRequired(false).build())
.build();
InputParam portParam = InputParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_PORT, FeiShuParamsConstants.FEI_SHU_PORT)
.addValidate(Validate.newBuilder()
.setRequired(false).build())
.build();
InputParam userParam =
InputParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_USER, FeiShuParamsConstants.FEI_SHU_USER)
.addValidate(Validate.newBuilder()
.setRequired(false).build())
.build();
PasswordParam passwordParam = PasswordParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_PASSWORD, FeiShuParamsConstants.FEI_SHU_PASSWORD)
.setPlaceholder("if enable use authentication, you need input password")
.build();
return Arrays.asList(webHookParam, isEnableProxy, proxyParam, portParam, userParam, passwordParam);
}
@Override
public AlertChannel create() {
return new FeiShuAlertChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import com.google.common.collect.ImmutableList;
public class FeiShuAlertPlugin implements DolphinSchedulerPlugin {
@Override
public Iterable<AlertChannelFactory> getAlertChannelFactorys() {
return ImmutableList.of(new FeiShuAlertChannelFactory());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
public class FeiShuParamsConstants {
private FeiShuParamsConstants() {
throw new IllegalStateException("Utility class");
}
static final String WEB_HOOK = "webhook";
static final String NAME_WEB_HOOK = "webHook";
public static final String FEI_SHU_PROXY_ENABLE = "isEnableProxy";
static final String NAME_FEI_SHU_PROXY_ENABLE = "isEnableProxy";
static final String FEI_SHU_PROXY = "proxy";
static final String NAME_FEI_SHU_PROXY = "proxy";
static final String FEI_SHU_PORT = "port";
static final String NAME_FEI_SHU_PORT = "port";
static final String FEI_SHU_USER = "user";
static final String NAME_FEI_SHU_USER = "user";
static final String FEI_SHU_PASSWORD = "password";
static final String NAME_FEI_SHU_PASSWORD = "password";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
public class FeiShuSender {
private static final Logger logger = LoggerFactory.getLogger(FeiShuSender.class);
private String url;
private Boolean enableProxy;
private String proxy;
private Integer port;
private String user;
private String password;
FeiShuSender(Map<String, String> config) {
url = config.get(FeiShuParamsConstants.NAME_WEB_HOOK);
enableProxy = Boolean.valueOf(config.get(FeiShuParamsConstants.NAME_FEI_SHU_PROXY_ENABLE));
if (Boolean.TRUE.equals(enableProxy)) {
port = Integer.parseInt(config.get(FeiShuParamsConstants.NAME_FEI_SHU_PORT));
proxy = config.get(FeiShuParamsConstants.NAME_FEI_SHU_PROXY);
user = config.get(FeiShuParamsConstants.NAME_FEI_SHU_USER);
password = config.get(FeiShuParamsConstants.NAME_FEI_SHU_PASSWORD);
}
}
private static RequestConfig getProxyConfig(String proxy, int port) {
HttpHost httpProxy = new HttpHost(proxy, port);
return RequestConfig.custom().setProxy(httpProxy).build();
}
private static String textToJsonString(AlertData alertData) {
Map<String, Object> items = new HashMap<>(2);
items.put("msg_type", "text");
Map<String, String> textContent = new HashMap<>();
byte[] byt = StringUtils.getBytesUtf8(formatContent(alertData));
String txt = StringUtils.newStringUtf8(byt);
textContent.put("text", txt);
items.put("content", textContent);
return JSONUtils.toJsonString(items);
}
private static AlertResult checkSendFeiShuSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
if (org.apache.dolphinscheduler.spi.utils.StringUtils.isBlank(result)) {
alertResult.setMessage("send fei shu msg error");
logger.info("send fei shu msg error,fei shu server resp is null");
return alertResult;
}
FeiShuSendMsgResponse sendMsgResponse = JSONUtils.parseObject(result, FeiShuSendMsgResponse.class);
if (null == sendMsgResponse) {
alertResult.setMessage("send fei shu msg fail");
logger.info("send fei shu msg error,resp error");
return alertResult;
}
if (sendMsgResponse.statusCode == 0) {
alertResult.setStatus("true");
alertResult.setMessage("send fei shu msg success");
return alertResult;
}
alertResult.setMessage(String.format("alert send fei shu msg error : %s", sendMsgResponse.getStatusMessage()));
logger.info("alert send fei shu msg error : {} ,Extra : {} ", sendMsgResponse.getStatusMessage(), sendMsgResponse.getExtra());
return alertResult;
}
public static String formatContent(AlertData alertData) {
if (alertData.getContent() != null) {
List<Map> list;
try {
list = JSONUtils.toList(alertData.getContent(), Map.class);
} catch (Exception e) {
logger.error("json format exception", e);
return null;
}
StringBuilder contents = new StringBuilder(100);
contents.append(String.format("`%s`%n", alertData.getTitle()));
for (Map map : list) {
Iterator<Entry<String, Object>> entries = map.entrySet().iterator();
while (entries.hasNext()) {
Entry<String, Object> entry = entries.next();
String key = entry.getKey();
String value = entry.getValue().toString();
contents.append(key + ":" + value);
contents.append("\n");
}
}
return contents.toString();
}
return null;
}
public AlertResult sendFeiShuMsg(AlertData alertData) {
AlertResult alertResult;
try {
String resp = sendMsg(alertData);
return checkSendFeiShuSendMsgResult(resp);
} catch (Exception e) {
logger.info("send fei shu alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setMessage("send fei shu alert fail.");
}
return alertResult;
}
private String sendMsg(AlertData alertData) throws IOException {
String msgToJson = textToJsonString(alertData);
HttpPost httpPost = HttpRequestUtil.constructHttpPost(url, msgToJson);
CloseableHttpClient httpClient;
httpClient = HttpRequestUtil.getHttpClient(enableProxy, proxy, port, user, password);
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
logger.error("send feishu message error, return http status code: " + statusCode);
}
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, "utf-8");
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send title :{} ,content :{}, resp: {}", alertData.getTitle(), alertData.getContent(), resp);
return resp;
} finally {
httpClient.close();
}
}
public static class FeiShuSendMsgResponse {
@JsonProperty("Extra")
private String extra;
@JsonProperty("StatusCode")
private Integer statusCode;
@JsonProperty("StatusMessage")
private String statusMessage;
public String getExtra() {
return extra;
}
public void setExtra(String extra) {
this.extra = extra;
}
public Integer getStatusCode() {
return statusCode;
}
public void setStatusCode(Integer statusCode) {
this.statusCode = statusCode;
}
public String getStatusMessage() {
return statusMessage;
}
public void setStatusMessage(String statusMessage) {
this.statusMessage = statusMessage;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
public class HttpRequestUtil {
public static CloseableHttpClient getHttpClient(boolean enableProxy, String proxy, Integer port, String user, String password) {
if (enableProxy) {
HttpHost httpProxy = new HttpHost(proxy, port);
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(new AuthScope(httpProxy), new UsernamePasswordCredentials(user, password));
return HttpClients.custom().setDefaultCredentialsProvider(provider).build();
} else {
return HttpClients.createDefault();
}
}
public static HttpPost constructHttpPost(String url, String msg) {
HttpPost post = new HttpPost(url);
StringEntity entity = new StringEntity(msg, ContentType.APPLICATION_JSON);
post.setEntity(entity);
return post;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class FeiShuAlertChannelFactoryTest {
@Test
public void testGetParams() {
FeiShuAlertChannelFactory feiShuAlertChannelFactory = new FeiShuAlertChannelFactory();
List<PluginParams> params = feiShuAlertChannelFactory.getParams();
JSONUtils.toJsonString(params);
Assert.assertEquals(6, params.size());
}
@Test
public void testCreate() {
FeiShuAlertChannelFactory feiShuAlertChannelFactory = new FeiShuAlertChannelFactory();
AlertChannel alertChannel = feiShuAlertChannelFactory.create();
Assert.assertNotNull(alertChannel);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class FeiShuSenderTest {
private static Map<String, String> feiShuConfig = new HashMap<>();
@Before
public void initFeiShuConfig() {
feiShuConfig.put(FeiShuParamsConstants.WEB_HOOK, "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx");
}
@Test
public void testSend() {
AlertData alertData = new AlertData();
alertData.setTitle("feishu test title");
alertData.setContent("feishu test content");
FeiShuSender feiShuSender = new FeiShuSender(feiShuConfig);
AlertResult alertResult = feiShuSender.sendFeiShuMsg(alertData);
Assert.assertEquals("false", alertResult.getStatus());
}
@Test
public void testFormatContent() {
String alertMsg = "[\n"
+ " {\n"
+ " \"owner\": \"dolphinscheduler\",\n"
+ " \"processEndTime\": \"2021-01-29 19:01:11\",\n"
+ " \"processHost\": \"10.81.129.4:5678\",\n"
+ " \"processId\": 2926,\n"
+ " \"processName\": \"3-20210129190038108\",\n"
+ " \"processStartTime\": \"2021-01-29 19:00:38\",\n"
+ " \"processState\": \"SUCCESS\",\n"
+ " \"processType\": \"START_PROCESS\",\n"
+ " \"projectId\": 2,\n"
+ " \"projectName\": \"testdelproject\",\n"
+ " \"recovery\": \"NO\",\n"
+ " \"retryTimes\": 0,\n"
+ " \"runTimes\": 1,\n"
+ " \"taskId\": 0\n"
+ " }\n"
+ "]";
AlertData alertData = new AlertData();
alertData.setTitle("");
alertData.setContent(alertMsg);
Assert.assertNotNull(FeiShuSender.formatContent(alertData));
}
}
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
<module>dolphinscheduler-alert-dingtalk</module> <module>dolphinscheduler-alert-dingtalk</module>
<module>dolphinscheduler-alert-script</module> <module>dolphinscheduler-alert-script</module>
<module>dolphinscheduler-alert-http</module> <module>dolphinscheduler-alert-http</module>
<module>dolphinscheduler-alert-feishu</module>
</modules> </modules>
......
...@@ -153,19 +153,21 @@ public class WorkerGroupService extends BaseService { ...@@ -153,19 +153,21 @@ public class WorkerGroupService extends BaseService {
} }
} }
// available workerGroup list
List<String> availableWorkerGroupList = new ArrayList<>();
for (String workerGroup : workerGroupList) { for (String workerGroup : workerGroupList) {
String workerGroupPath = workerPath + "/" + workerGroup; String workerGroupPath = workerPath + "/" + workerGroup;
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
String timeStamp = "";
for (int i = 0; i < childrenNodes.size(); i++) {
String ip = childrenNodes.get(i);
childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":")));
timeStamp = ip.substring(ip.lastIndexOf(":"));
}
if (CollectionUtils.isNotEmpty(childrenNodes)) { if (CollectionUtils.isNotEmpty(childrenNodes)) {
availableWorkerGroupList.add(workerGroup);
WorkerGroup wg = new WorkerGroup(); WorkerGroup wg = new WorkerGroup();
wg.setName(workerGroup); wg.setName(workerGroup);
if (isPaging) { if (isPaging) {
wg.setIpList(childrenNodes); wg.setIpList(childrenNodes);
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0)); String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) + timeStamp);
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6])); wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7])); wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
} }
......
...@@ -61,4 +61,8 @@ public class StringUtils { ...@@ -61,4 +61,8 @@ public class StringUtils {
public static String trim(String str) { public static String trim(String str) {
return str == null ? null : str.trim(); return str == null ? null : str.trim();
} }
public static boolean equalsIgnoreCase(String str1, String str2) {
return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2);
}
} }
...@@ -62,7 +62,7 @@ yarn.application.status.address=http://ds1:8088/ws/v1/cluster/apps/%s ...@@ -62,7 +62,7 @@ yarn.application.status.address=http://ds1:8088/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) # job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
# system env path # system env path, If you want to set your own path, you need to set this env file to an absolute path
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh #dolphinscheduler.env.path=env/dolphinscheduler_env.sh
development.state=false development.state=false
......
MIT License
Copyright (c) 2018 xaboy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
...@@ -64,4 +64,9 @@ ...@@ -64,4 +64,9 @@
<unpack/> <unpack/>
</artifact> </artifact>
</artifactSet> </artifactSet>
<artifactSet to="lib/plugin/alert/feishu">
<artifact id="${project.groupId}:dolphinscheduler-alert-feishu:zip:${project.version}">
<unpack/>
</artifact>
</artifactSet>
</runtime> </runtime>
\ No newline at end of file
...@@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory; ...@@ -55,7 +55,6 @@ import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
...@@ -84,7 +83,7 @@ public class NettyRemotingClient { ...@@ -84,7 +83,7 @@ public class NettyRemotingClient {
/** /**
* channels * channels
*/ */
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128); private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
/** /**
* started flag * started flag
...@@ -130,7 +129,7 @@ public class NettyRemotingClient { ...@@ -130,7 +129,7 @@ public class NettyRemotingClient {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
if (NettyUtils.useEpoll()) { if (NettyUtils.useEpoll()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
...@@ -139,7 +138,7 @@ public class NettyRemotingClient { ...@@ -139,7 +138,7 @@ public class NettyRemotingClient {
}); });
} else { } else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
...@@ -148,8 +147,8 @@ public class NettyRemotingClient { ...@@ -148,8 +147,8 @@ public class NettyRemotingClient {
}); });
} }
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy()); new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor); this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor")); this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
...@@ -163,40 +162,32 @@ public class NettyRemotingClient { ...@@ -163,40 +162,32 @@ public class NettyRemotingClient {
private void start() { private void start() {
this.bootstrap this.bootstrap
.group(this.workerGroup) .group(this.workerGroup)
.channel(NettyUtils.getSocketChannelClass()) .channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) { public void initChannel(SocketChannel ch) {
ch.pipeline() ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
.addLast(new NettyDecoder(), clientHandler, encoder); .addLast(new NettyDecoder(), clientHandler, encoder);
} }
}); });
this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);
@Override
public void run() {
ResponseFuture.scanFutureTable();
}
}, 5000, 1000, TimeUnit.MILLISECONDS);
//
isStarted.compareAndSet(false, true); isStarted.compareAndSet(false, true);
} }
/** /**
* async send * async send
* *
* @param host host * @param host host
* @param command command * @param command command
* @param timeoutMillis timeoutMillis * @param timeoutMillis timeoutMillis
* @param invokeCallback callback function * @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
*/ */
public void sendAsync(final Host host, final Command command, public void sendAsync(final Host host, final Command command,
final long timeoutMillis, final long timeoutMillis,
...@@ -205,53 +196,49 @@ public class NettyRemotingClient { ...@@ -205,53 +196,49 @@ public class NettyRemotingClient {
if (channel == null) { if (channel == null) {
throw new RemotingException("network error"); throw new RemotingException("network error");
} }
/** /*
* request unique identification * request unique identification
*/ */
final long opaque = command.getOpaque(); final long opaque = command.getOpaque();
/** /*
* control concurrency number * control concurrency number
*/ */
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) { if (acquired) {
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
/** /*
* response future * response future
*/ */
final ResponseFuture responseFuture = new ResponseFuture(opaque, final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis, timeoutMillis,
invokeCallback, invokeCallback,
releaseSemaphore); releaseSemaphore);
try { try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener() { channel.writeAndFlush(command).addListener(future -> {
if (future.isSuccess()) {
@Override responseFuture.setSendOk(true);
public void operationComplete(ChannelFuture future) throws Exception { return;
if (future.isSuccess()) { } else {
responseFuture.setSendOk(true); responseFuture.setSendOk(false);
return; }
} else { responseFuture.setCause(future.cause());
responseFuture.setSendOk(false); responseFuture.putResponse(null);
} try {
responseFuture.setCause(future.cause()); responseFuture.executeInvokeCallback();
responseFuture.putResponse(null); } catch (Exception ex) {
try { logger.error("execute callback error", ex);
responseFuture.executeInvokeCallback(); } finally {
} catch (Throwable ex) { responseFuture.release();
logger.error("execute callback error", ex);
} finally {
responseFuture.release();
}
} }
}); });
} catch (Throwable ex) { } catch (Exception ex) {
responseFuture.release(); responseFuture.release();
throw new RemotingException(String.format("send command to host: %s failed", host), ex); throw new RemotingException(String.format("send command to host: %s failed", host), ex);
} }
} else { } else {
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
throw new RemotingTooMuchRequestException(message); throw new RemotingTooMuchRequestException(message);
} }
} }
...@@ -259,12 +246,10 @@ public class NettyRemotingClient { ...@@ -259,12 +246,10 @@ public class NettyRemotingClient {
/** /**
* sync send * sync send
* *
* @param host host * @param host host
* @param command command * @param command command
* @param timeoutMillis timeoutMillis * @param timeoutMillis timeoutMillis
* @return command * @return command
* @throws InterruptedException
* @throws RemotingException
*/ */
public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException { public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host); final Channel channel = getChannel(host);
...@@ -273,21 +258,18 @@ public class NettyRemotingClient { ...@@ -273,21 +258,18 @@ public class NettyRemotingClient {
} }
final long opaque = command.getOpaque(); final long opaque = command.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
channel.writeAndFlush(command).addListener(new ChannelFutureListener() { channel.writeAndFlush(command).addListener(future -> {
@Override if (future.isSuccess()) {
public void operationComplete(ChannelFuture future) throws Exception { responseFuture.setSendOk(true);
if (future.isSuccess()) { return;
responseFuture.setSendOk(true); } else {
return; responseFuture.setSendOk(false);
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to host {} failed", command, host);
} }
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to host {} failed", command, host);
}); });
/** /*
* sync wait for result * sync wait for result
*/ */
Command result = responseFuture.waitResponse(); Command result = responseFuture.waitResponse();
...@@ -304,9 +286,8 @@ public class NettyRemotingClient { ...@@ -304,9 +286,8 @@ public class NettyRemotingClient {
/** /**
* send task * send task
* *
* @param host host * @param host host
* @param command command * @param command command
* @throws RemotingException
*/ */
public void send(final Host host, final Command command) throws RemotingException { public void send(final Host host, final Command command) throws RemotingException {
Channel channel = getChannel(host); Channel channel = getChannel(host);
...@@ -332,7 +313,7 @@ public class NettyRemotingClient { ...@@ -332,7 +313,7 @@ public class NettyRemotingClient {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
...@@ -342,8 +323,8 @@ public class NettyRemotingClient { ...@@ -342,8 +323,8 @@ public class NettyRemotingClient {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor); this.clientHandler.registerProcessor(commandType, processor, executor);
...@@ -351,9 +332,6 @@ public class NettyRemotingClient { ...@@ -351,9 +332,6 @@ public class NettyRemotingClient {
/** /**
* get channel * get channel
*
* @param host
* @return
*/ */
public Channel getChannel(Host host) { public Channel getChannel(Host host) {
Channel channel = channels.get(host); Channel channel = channels.get(host);
...@@ -366,7 +344,7 @@ public class NettyRemotingClient { ...@@ -366,7 +344,7 @@ public class NettyRemotingClient {
/** /**
* create channel * create channel
* *
* @param host host * @param host host
* @param isSync sync flag * @param isSync sync flag
* @return channel * @return channel
*/ */
......
...@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.remote.codec.NettyDecoder; ...@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemoteException;
import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
...@@ -93,6 +94,11 @@ public class NettyRemotingServer { ...@@ -93,6 +94,11 @@ public class NettyRemotingServer {
*/ */
private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* Netty server bind fail message
*/
private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer bind %s fail";
/** /**
* server init * server init
* *
...@@ -102,7 +108,7 @@ public class NettyRemotingServer { ...@@ -102,7 +108,7 @@ public class NettyRemotingServer {
this.serverConfig = serverConfig; this.serverConfig = serverConfig;
if (NettyUtils.useEpoll()) { if (NettyUtils.useEpoll()) {
this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() { this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
...@@ -111,7 +117,7 @@ public class NettyRemotingServer { ...@@ -111,7 +117,7 @@ public class NettyRemotingServer {
}); });
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
...@@ -120,7 +126,7 @@ public class NettyRemotingServer { ...@@ -120,7 +126,7 @@ public class NettyRemotingServer {
}); });
} else { } else {
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
...@@ -129,7 +135,7 @@ public class NettyRemotingServer { ...@@ -129,7 +135,7 @@ public class NettyRemotingServer {
}); });
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
...@@ -145,35 +151,35 @@ public class NettyRemotingServer { ...@@ -145,35 +151,35 @@ public class NettyRemotingServer {
public void start() { public void start() {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap this.serverBootstrap
.group(this.bossGroup, this.workGroup) .group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass()) .channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) {
initNettyChannel(ch); initNettyChannel(ch);
} }
}); });
ChannelFuture future; ChannelFuture future;
try { try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) { } catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
} }
if (future.isSuccess()) { if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) { } else if (future.cause() != null) {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
} else { } else {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
} }
} }
} }
...@@ -185,17 +191,17 @@ public class NettyRemotingServer { ...@@ -185,17 +191,17 @@ public class NettyRemotingServer {
*/ */
private void initNettyChannel(SocketChannel ch) { private void initNettyChannel(SocketChannel ch) {
ch.pipeline() ch.pipeline()
.addLast("encoder", encoder) .addLast("encoder", encoder)
.addLast("decoder", new NettyDecoder()) .addLast("decoder", new NettyDecoder())
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
.addLast("handler", serverHandler); .addLast("handler", serverHandler);
} }
/** /**
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
...@@ -205,8 +211,8 @@ public class NettyRemotingServer { ...@@ -205,8 +211,8 @@ public class NettyRemotingServer {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(commandType, processor, executor); this.serverHandler.registerProcessor(commandType, processor, executor);
......
...@@ -14,16 +14,19 @@ ...@@ -14,16 +14,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.codec; package org.apache.dolphinscheduler.remote.codec;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
/** /**
* netty encoder * netty encoder
*/ */
@Sharable @Sharable
public class NettyEncoder extends MessageToByteEncoder<Command> { public class NettyEncoder extends MessageToByteEncoder<Command> {
...@@ -34,12 +37,11 @@ public class NettyEncoder extends MessageToByteEncoder<Command> { ...@@ -34,12 +37,11 @@ public class NettyEncoder extends MessageToByteEncoder<Command> {
* @param ctx channel handler context * @param ctx channel handler context
* @param msg command * @param msg command
* @param out byte buffer * @param out byte buffer
* @throws Exception
*/ */
@Override @Override
protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
if(msg == null){ if (msg == null) {
throw new Exception("encode msg is null"); throw new RemotingException("encode msg is null");
} }
out.writeByte(Command.MAGIC); out.writeByte(Command.MAGIC);
out.writeByte(Command.VERSION); out.writeByte(Command.VERSION);
......
...@@ -23,7 +23,7 @@ import java.io.Serializable; ...@@ -23,7 +23,7 @@ import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
* kill task response command * kill task response command
*/ */
public class TaskKillResponseCommand implements Serializable { public class TaskKillResponseCommand implements Serializable {
...@@ -51,7 +51,7 @@ public class TaskKillResponseCommand implements Serializable { ...@@ -51,7 +51,7 @@ public class TaskKillResponseCommand implements Serializable {
/** /**
* other resource manager appId , for example : YARN etc * other resource manager appId , for example : YARN etc
*/ */
protected List<String> appIds; private List<String> appIds;
public int getTaskInstanceId() { public int getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;
...@@ -94,7 +94,7 @@ public class TaskKillResponseCommand implements Serializable { ...@@ -94,7 +94,7 @@ public class TaskKillResponseCommand implements Serializable {
} }
/** /**
* package request command * package request command
* *
* @return command * @return command
*/ */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.exceptions;
/**
* Custom runtime exception
*/
public class RemoteException extends RuntimeException {
/**
* Construct a new runtime exception with the detail message
*
* @param message detail message
*/
public RemoteException(String message) {
super(message);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param message detail message
*/
public RemoteException(String message, Throwable cause) {
super(message, cause);
}
}
...@@ -18,14 +18,17 @@ ...@@ -18,14 +18,17 @@
package org.apache.dolphinscheduler.remote.future; package org.apache.dolphinscheduler.remote.future;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* response future * response future
...@@ -34,25 +37,25 @@ public class ResponseFuture { ...@@ -34,25 +37,25 @@ public class ResponseFuture {
private static final Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class); private static final Logger LOGGER = LoggerFactory.getLogger(ResponseFuture.class);
private static final ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256); private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
/** /**
* request unique identification * request unique identification
*/ */
private final long opaque; private final long opaque;
/** /**
* timeout * timeout
*/ */
private final long timeoutMillis; private final long timeoutMillis;
/** /**
* invokeCallback function * invokeCallback function
*/ */
private final InvokeCallback invokeCallback; private final InvokeCallback invokeCallback;
/** /**
* releaseSemaphore * releaseSemaphore
*/ */
private final ReleaseSemaphore releaseSemaphore; private final ReleaseSemaphore releaseSemaphore;
...@@ -61,7 +64,7 @@ public class ResponseFuture { ...@@ -61,7 +64,7 @@ public class ResponseFuture {
private final long beginTimestamp = System.currentTimeMillis(); private final long beginTimestamp = System.currentTimeMillis();
/** /**
* response command * response command
*/ */
private Command responseCommand; private Command responseCommand;
...@@ -78,10 +81,9 @@ public class ResponseFuture { ...@@ -78,10 +81,9 @@ public class ResponseFuture {
} }
/** /**
* wait for response * wait for response
* *
* @return command * @return command
* @throws InterruptedException
*/ */
public Command waitResponse() throws InterruptedException { public Command waitResponse() throws InterruptedException {
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
...@@ -89,7 +91,7 @@ public class ResponseFuture { ...@@ -89,7 +91,7 @@ public class ResponseFuture {
} }
/** /**
* put response * put response
* *
* @param responseCommand responseCommand * @param responseCommand responseCommand
*/ */
...@@ -99,12 +101,13 @@ public class ResponseFuture { ...@@ -99,12 +101,13 @@ public class ResponseFuture {
FUTURE_TABLE.remove(opaque); FUTURE_TABLE.remove(opaque);
} }
public static ResponseFuture getFuture(long opaque){ public static ResponseFuture getFuture(long opaque) {
return FUTURE_TABLE.get(opaque); return FUTURE_TABLE.get(opaque);
} }
/** /**
* whether timeout * whether timeout
*
* @return timeout * @return timeout
*/ */
public boolean isTimeout() { public boolean isTimeout() {
...@@ -113,7 +116,7 @@ public class ResponseFuture { ...@@ -113,7 +116,7 @@ public class ResponseFuture {
} }
/** /**
* execute invoke callback * execute invoke callback
*/ */
public void executeInvokeCallback() { public void executeInvokeCallback() {
if (invokeCallback != null) { if (invokeCallback != null) {
...@@ -162,10 +165,10 @@ public class ResponseFuture { ...@@ -162,10 +165,10 @@ public class ResponseFuture {
} }
/** /**
* release * release
*/ */
public void release() { public void release() {
if(this.releaseSemaphore != null){ if (this.releaseSemaphore != null) {
this.releaseSemaphore.release(); this.releaseSemaphore.release();
} }
} }
...@@ -173,7 +176,7 @@ public class ResponseFuture { ...@@ -173,7 +176,7 @@ public class ResponseFuture {
/** /**
* scan future table * scan future table
*/ */
public static void scanFutureTable(){ public static void scanFutureTable() {
final List<ResponseFuture> futureList = new LinkedList<>(); final List<ResponseFuture> futureList = new LinkedList<>();
Iterator<Map.Entry<Long, ResponseFuture>> it = FUTURE_TABLE.entrySet().iterator(); Iterator<Map.Entry<Long, ResponseFuture>> it = FUTURE_TABLE.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
...@@ -189,7 +192,7 @@ public class ResponseFuture { ...@@ -189,7 +192,7 @@ public class ResponseFuture {
try { try {
future.release(); future.release();
future.executeInvokeCallback(); future.executeInvokeCallback();
} catch (Throwable ex) { } catch (Exception ex) {
LOGGER.warn("scanFutureTable, execute callback error", ex); LOGGER.warn("scanFutureTable, execute callback error", ex);
} }
} }
...@@ -197,16 +200,16 @@ public class ResponseFuture { ...@@ -197,16 +200,16 @@ public class ResponseFuture {
@Override @Override
public String toString() { public String toString() {
return "ResponseFuture{" + return "ResponseFuture{"
"opaque=" + opaque + + "opaque=" + opaque
", timeoutMillis=" + timeoutMillis + + ", timeoutMillis=" + timeoutMillis
", invokeCallback=" + invokeCallback + + ", invokeCallback=" + invokeCallback
", releaseSemaphore=" + releaseSemaphore + + ", releaseSemaphore=" + releaseSemaphore
", latch=" + latch + + ", latch=" + latch
", beginTimestamp=" + beginTimestamp + + ", beginTimestamp=" + beginTimestamp
", responseCommand=" + responseCommand + + ", responseCommand=" + responseCommand
", sendOk=" + sendOk + + ", sendOk=" + sendOk
", cause=" + cause + + ", cause=" + cause
'}'; + '}';
} }
} }
...@@ -74,7 +74,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -74,7 +74,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) { public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor; this.callbackExecutor = callbackExecutor;
this.processors = new ConcurrentHashMap(); this.processors = new ConcurrentHashMap<>();
} }
/** /**
...@@ -82,10 +82,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -82,10 +82,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
* the current channel has reached the end of its life cycle * the current channel has reached the end of its life cycle
* *
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception
*/ */
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) {
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close(); ctx.channel().close();
} }
...@@ -95,10 +94,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -95,10 +94,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param msg message * @param msg message
* @throws Exception
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) {
processReceived(ctx.channel(), (Command) msg); processReceived(ctx.channel(), (Command) msg);
} }
...@@ -106,7 +104,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -106,7 +104,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
...@@ -116,8 +114,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -116,8 +114,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor; ExecutorService executorRef = executor;
...@@ -138,12 +136,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -138,12 +136,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
future.setResponseCommand(command); future.setResponseCommand(command);
future.release(); future.release();
if (future.getInvokeCallback() != null) { if (future.getInvokeCallback() != null) {
this.callbackExecutor.submit(new Runnable() { this.callbackExecutor.submit(future::executeInvokeCallback);
@Override
public void run() {
future.executeInvokeCallback();
}
});
} else { } else {
future.putResponse(command); future.putResponse(command);
} }
...@@ -158,7 +151,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -158,7 +151,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
Runnable run = () -> { Runnable run = () -> {
try { try {
pair.getLeft().process(channel, command); pair.getLeft().process(channel, command);
} catch (Throwable e) { } catch (Exception e) {
logger.error(String.format("process command %s exception", command), e); logger.error(String.format("process command %s exception", command), e);
} }
}; };
...@@ -175,13 +168,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -175,13 +168,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/** /**
* caught exception * caught exception
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param cause cause * @param cause cause
* @throws Exception
*/ */
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("exceptionCaught : {}", cause); logger.error("exceptionCaught : {}", cause.getMessage(), cause);
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel())); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close(); ctx.channel().close();
} }
...@@ -193,11 +185,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -193,11 +185,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
heartBeat.setType(CommandType.HEART_BEAT); heartBeat.setType(CommandType.HEART_BEAT);
heartBeat.setBody(heartBeatData); heartBeat.setBody(heartBeatData);
ctx.writeAndFlush(heartBeat) ctx.writeAndFlush(heartBeat)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else { } else {
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
} }
} }
} }
\ No newline at end of file
...@@ -55,7 +55,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -55,7 +55,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** /**
* server processors queue * server processors queue
*/ */
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap(); private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();
public NettyServerHandler(NettyRemotingServer nettyRemotingServer) { public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
this.nettyRemotingServer = nettyRemotingServer; this.nettyRemotingServer = nettyRemotingServer;
...@@ -66,10 +66,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -66,10 +66,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* the current channel has reached the end of its life cycle * the current channel has reached the end of its life cycle
* *
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception
*/ */
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) {
ctx.channel().close(); ctx.channel().close();
} }
...@@ -78,10 +77,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -78,10 +77,9 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param msg message * @param msg message
* @throws Exception
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) {
processReceived(ctx.channel(), (Command) msg); processReceived(ctx.channel(), (Command) msg);
} }
...@@ -89,7 +87,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -89,7 +87,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null); this.registerProcessor(commandType, processor, null);
...@@ -99,8 +97,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -99,8 +97,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* register processor * register processor
* *
* @param commandType command type * @param commandType command type
* @param processor processor * @param processor processor
* @param executor thread executor * @param executor thread executor
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor; ExecutorService executorRef = executor;
...@@ -114,21 +112,17 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -114,21 +112,17 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* process received logic * process received logic
* *
* @param channel channel * @param channel channel
* @param msg message * @param msg message
*/ */
private void processReceived(final Channel channel, final Command msg) { private void processReceived(final Channel channel, final Command msg) {
final CommandType commandType = msg.getType(); final CommandType commandType = msg.getType();
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType); final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) { if (pair != null) {
Runnable r = new Runnable() { Runnable r = () -> {
try {
@Override pair.getLeft().process(channel, msg);
public void run() { } catch (Exception ex) {
try { logger.error("process msg {} error", msg, ex);
pair.getLeft().process(channel, msg);
} catch (Throwable ex) {
logger.error("process msg {} error", msg, ex);
}
} }
}; };
try { try {
...@@ -144,9 +138,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -144,9 +138,8 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** /**
* caught exception * caught exception
* *
* @param ctx channel handler context * @param ctx channel handler context
* @param cause cause * @param cause cause
* @throws Exception
*/ */
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
...@@ -158,7 +151,6 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -158,7 +151,6 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
* channel write changed * channel write changed
* *
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception
*/ */
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
...@@ -168,14 +160,14 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -168,14 +160,14 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (!ch.isWritable()) { if (!ch.isWritable()) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, over high water level : {}", logger.warn("{} is not writable, over high water level : {}",
ch, config.getWriteBufferHighWaterMark()); ch, config.getWriteBufferHighWaterMark());
} }
config.setAutoRead(false); config.setAutoRead(false);
} else { } else {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("{} is writable, to low water : {}", logger.warn("{} is writable, to low water : {}",
ch, config.getWriteBufferLowWaterMark()); ch, config.getWriteBufferLowWaterMark());
} }
config.setAutoRead(true); config.setAutoRead(true);
} }
...@@ -189,4 +181,4 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { ...@@ -189,4 +181,4 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
} }
} }
} }
\ No newline at end of file
...@@ -14,43 +14,50 @@ ...@@ -14,43 +14,50 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.utils;
import io.netty.channel.Channel; package org.apache.dolphinscheduler.remote.utils;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import io.netty.channel.Channel;
/** /**
* channel utils * channel utils
*/ */
public class ChannelUtils { public class ChannelUtils {
private ChannelUtils() {
throw new IllegalStateException(ChannelUtils.class.getName());
}
/** /**
* get local address * get local address
* *
* @param channel channel * @param channel channel
* @return local address * @return local address
*/ */
public static String getLocalAddress(Channel channel){ public static String getLocalAddress(Channel channel) {
return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress(); return ((InetSocketAddress) channel.localAddress()).getAddress().getHostAddress();
} }
/** /**
* get remote address * get remote address
*
* @param channel channel * @param channel channel
* @return remote address * @return remote address
*/ */
public static String getRemoteAddress(Channel channel){ public static String getRemoteAddress(Channel channel) {
return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress(); return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
} }
/** /**
* channel to address * channel to address
*
* @param channel channel * @param channel channel
* @return address * @return address
*/ */
public static Host toAddress(Channel channel){ public static Host toAddress(Channel channel) {
InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress()); InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress());
return new Host(socketAddress.getAddress().getHostAddress(), socketAddress.getPort()); return new Host(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
} }
......
...@@ -25,6 +25,10 @@ import java.nio.charset.StandardCharsets; ...@@ -25,6 +25,10 @@ import java.nio.charset.StandardCharsets;
*/ */
public class Constants { public class Constants {
private Constants() {
throw new IllegalStateException(Constants.class.getName());
}
public static final String COMMA = ","; public static final String COMMA = ",";
public static final String SLASH = "/"; public static final String SLASH = "/";
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
*/ */
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
import org.apache.dolphinscheduler.remote.exceptions.RemoteException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -28,22 +30,24 @@ import java.util.regex.Pattern; ...@@ -28,22 +30,24 @@ import java.util.regex.Pattern;
public class IPUtils { public class IPUtils {
private static final Logger logger = LoggerFactory.getLogger(IPUtils.class); private IPUtils() {
throw new IllegalStateException(IPUtils.class.getName());
}
private static String IP_REGEX = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}"; private static final Logger logger = LoggerFactory.getLogger(IPUtils.class);
private static String LOCAL_HOST = "unknown"; private static String localHost = "unknown";
static { static {
String host = System.getenv("HOSTNAME"); String host = System.getenv("HOSTNAME");
if (isNotEmpty(host)) { if (isNotEmpty(host)) {
LOCAL_HOST = host; localHost = host;
} else { } else {
try { try {
String hostName = InetAddress.getLocalHost().getHostName(); String hostName = InetAddress.getLocalHost().getHostName();
if (isNotEmpty(hostName)) { if (isNotEmpty(hostName)) {
LOCAL_HOST = hostName; localHost = hostName;
} }
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
logger.error("get hostName error!", e); logger.error("get hostName error!", e);
...@@ -52,7 +56,7 @@ public class IPUtils { ...@@ -52,7 +56,7 @@ public class IPUtils {
} }
public static String getLocalHost() { public static String getLocalHost() {
return LOCAL_HOST; return localHost;
} }
...@@ -100,7 +104,7 @@ public class IPUtils { ...@@ -100,7 +104,7 @@ public class IPUtils {
return addresses; return addresses;
} catch (SocketException e) { } catch (SocketException e) {
throw new RuntimeException(e.getMessage(), e); throw new RemoteException(e.getMessage(), e);
} }
} }
...@@ -131,12 +135,11 @@ public class IPUtils { ...@@ -131,12 +135,11 @@ public class IPUtils {
return false; return false;
} }
Pattern pat = Pattern.compile(IP_REGEX); String ipRegex = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}";
Pattern pat = Pattern.compile(ipRegex);
Matcher mat = pat.matcher(addr); Matcher mat = pat.matcher(addr);
boolean ipAddress = mat.find(); return mat.find();
return ipAddress;
} }
} }
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
/** /**
* key value pair * key value pair
* *
...@@ -51,7 +50,7 @@ public class Pair<L, R> { ...@@ -51,7 +50,7 @@ public class Pair<L, R> {
this.right = right; this.right = right;
} }
public static <L, R> Pair of(L left, R right){ public static <L, R> Pair<L, R> of(L left, R right) {
return new Pair(left, right); return new Pair<>(left, right);
} }
} }
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.bean; package org.apache.dolphinscheduler.service.bean;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
...@@ -31,9 +32,7 @@ public class SpringApplicationContext implements ApplicationContextAware { ...@@ -31,9 +32,7 @@ public class SpringApplicationContext implements ApplicationContextAware {
SpringApplicationContext.applicationContext = applicationContext; SpringApplicationContext.applicationContext = applicationContext;
} }
public static <T> T getBean(Class<T> requiredType){ public static <T> T getBean(Class<T> requiredType) {
return applicationContext.getBean(requiredType); return applicationContext.getBean(requiredType);
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.exceptions;
/**
* Custom ZKServerException exception
*/
public class ServiceException extends RuntimeException {
/**
* Construct a new runtime exception with the error message
*
* @param errMsg Error message
*/
public ServiceException(String errMsg) {
super(errMsg);
}
/**
* Construct a new runtime exception with the cause
*
* @param cause cause
*/
public ServiceException(Throwable cause) {
super(cause);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param errMsg message
* @param cause cause
*/
public ServiceException(String errMsg, Throwable cause) {
super(errMsg, cause);
}
}
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.permission; package org.apache.dolphinscheduler.service.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
...@@ -21,11 +22,13 @@ import org.apache.dolphinscheduler.common.enums.UserType; ...@@ -21,11 +22,13 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.List; import java.util.List;
import org.slf4j.Logger;
public class PermissionCheck<T> { public class PermissionCheck<T> {
/** /**
* logger * logger
...@@ -58,8 +61,9 @@ public class PermissionCheck<T> { ...@@ -58,8 +61,9 @@ public class PermissionCheck<T> {
/** /**
* permission check * permission check
*
* @param authorizationType authorization type * @param authorizationType authorization type
* @param processService process dao * @param processService process dao
*/ */
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService) { public PermissionCheck(AuthorizationType authorizationType, ProcessService processService) {
this.authorizationType = authorizationType; this.authorizationType = authorizationType;
...@@ -68,10 +72,6 @@ public class PermissionCheck<T> { ...@@ -68,10 +72,6 @@ public class PermissionCheck<T> {
/** /**
* permission check * permission check
* @param authorizationType
* @param processService
* @param needChecks
* @param userId
*/ */
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId) { public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId) {
this.authorizationType = authorizationType; this.authorizationType = authorizationType;
...@@ -82,11 +82,6 @@ public class PermissionCheck<T> { ...@@ -82,11 +82,6 @@ public class PermissionCheck<T> {
/** /**
* permission check * permission check
* @param authorizationType
* @param processService
* @param needChecks
* @param userId
* @param logger
*/ */
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId, Logger logger) { public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId, Logger logger) {
this.authorizationType = authorizationType; this.authorizationType = authorizationType;
...@@ -98,13 +93,8 @@ public class PermissionCheck<T> { ...@@ -98,13 +93,8 @@ public class PermissionCheck<T> {
/** /**
* permission check * permission check
* @param logger
* @param authorizationType
* @param processService
* @param resourceList
* @param userId
*/ */
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId,Logger logger) { public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId, Logger logger) {
this.authorizationType = authorizationType; this.authorizationType = authorizationType;
this.processService = processService; this.processService = processService;
this.resourceList = resourceList; this.resourceList = resourceList;
...@@ -154,9 +144,10 @@ public class PermissionCheck<T> { ...@@ -154,9 +144,10 @@ public class PermissionCheck<T> {
/** /**
* has permission * has permission
*
* @return true if has permission * @return true if has permission
*/ */
public boolean hasPermission(){ public boolean hasPermission() {
try { try {
checkPermission(); checkPermission();
return true; return true;
...@@ -167,23 +158,24 @@ public class PermissionCheck<T> { ...@@ -167,23 +158,24 @@ public class PermissionCheck<T> {
/** /**
* check permission * check permission
* @throws Exception exception *
* @throws ServiceException exception
*/ */
public void checkPermission() throws Exception{ public void checkPermission() throws ServiceException {
if(this.needChecks.length > 0){ if (this.needChecks.length > 0) {
// get user type in order to judge whether the user is admin // get user type in order to judge whether the user is admin
User user = processService.getUserById(userId); User user = processService.getUserById(userId);
if (user == null) { if (user == null) {
logger.error("user id {} didn't exist",userId); logger.error("user id {} doesn't exist", userId);
throw new RuntimeException(String.format("user %s didn't exist",userId)); throw new ServiceException(String.format("user %s doesn't exist", userId));
} }
if (user.getUserType() != UserType.ADMIN_USER){ if (user.getUserType() != UserType.ADMIN_USER) {
List<T> unauthorizedList = processService.listUnauthorized(userId,needChecks,authorizationType); List<T> unauthorizedList = processService.listUnauthorized(userId, needChecks, authorizationType);
// if exist unauthorized resource // if exist unauthorized resource
if(CollectionUtils.isNotEmpty(unauthorizedList)){ if (CollectionUtils.isNotEmpty(unauthorizedList)) {
logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList); logger.error("user {} doesn't have permission of {}: {}", user.getUserName(), authorizationType.getDescp(), unauthorizedList);
throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0))); throw new ServiceException(String.format("user %s doesn't have permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
} }
} }
} }
......
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz;
package org.apache.dolphinscheduler.service.quartz;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
...@@ -25,6 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; ...@@ -25,6 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import org.quartz.Job; import org.quartz.Job;
import org.quartz.JobDataMap; import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
...@@ -34,8 +37,6 @@ import org.slf4j.LoggerFactory; ...@@ -34,8 +37,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.Date;
/** /**
* process schedule job * process schedule job
*/ */
...@@ -46,7 +47,7 @@ public class ProcessScheduleJob implements Job { ...@@ -46,7 +47,7 @@ public class ProcessScheduleJob implements Job {
*/ */
private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class); private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class);
public ProcessService getProcessService(){ public ProcessService getProcessService() {
return SpringApplicationContext.getBean(ProcessService.class); return SpringApplicationContext.getBean(ProcessService.class);
} }
...@@ -66,10 +67,8 @@ public class ProcessScheduleJob implements Job { ...@@ -66,10 +67,8 @@ public class ProcessScheduleJob implements Job {
int projectId = dataMap.getInt(Constants.PROJECT_ID); int projectId = dataMap.getInt(Constants.PROJECT_ID);
int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID); int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID);
Date scheduledFireTime = context.getScheduledFireTime(); Date scheduledFireTime = context.getScheduledFireTime();
Date fireTime = context.getFireTime(); Date fireTime = context.getFireTime();
logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId); logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId);
...@@ -82,11 +81,10 @@ public class ProcessScheduleJob implements Job { ...@@ -82,11 +81,10 @@ public class ProcessScheduleJob implements Job {
return; return;
} }
ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId()); ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
// release state : online/offline // release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState(); ReleaseState releaseState = processDefinition.getReleaseState();
if (processDefinition == null || releaseState == ReleaseState.OFFLINE) { if (releaseState == ReleaseState.OFFLINE) {
logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId); logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId);
return; return;
} }
...@@ -107,7 +105,6 @@ public class ProcessScheduleJob implements Job { ...@@ -107,7 +105,6 @@ public class ProcessScheduleJob implements Job {
getProcessService().createCommand(command); getProcessService().createCommand(command);
} }
/** /**
* delete job * delete job
*/ */
......
...@@ -14,159 +14,177 @@ ...@@ -14,159 +14,177 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.quartz.cron; package org.apache.dolphinscheduler.service.quartz.cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
import com.cronutils.model.field.CronField; import com.cronutils.model.field.CronField;
import com.cronutils.model.field.CronFieldName; import com.cronutils.model.field.CronFieldName;
import com.cronutils.model.field.expression.*; import com.cronutils.model.field.expression.Always;
import org.apache.dolphinscheduler.common.enums.CycleEnum; import com.cronutils.model.field.expression.And;
import com.cronutils.model.field.expression.Between;
import com.cronutils.model.field.expression.Every;
import com.cronutils.model.field.expression.FieldExpression;
import com.cronutils.model.field.expression.On;
/** /**
* Cycle * Cycle
*/ */
public abstract class AbstractCycle { public abstract class AbstractCycle {
protected Cron cron; protected Cron cron;
protected CronField minField; protected CronField minField;
protected CronField hourField; protected CronField hourField;
protected CronField dayOfMonthField; protected CronField dayOfMonthField;
protected CronField dayOfWeekField; protected CronField dayOfWeekField;
protected CronField monthField; protected CronField monthField;
protected CronField yearField; protected CronField yearField;
public CycleLinks addCycle(AbstractCycle cycle) { public CycleLinks addCycle(AbstractCycle cycle) {
return new CycleLinks(this.cron).addCycle(this).addCycle(cycle); return new CycleLinks(this.cron).addCycle(this).addCycle(cycle);
} }
/** /**
* cycle constructor * cycle constructor
* @param cron cron *
*/ * @param cron cron
public AbstractCycle(Cron cron) { */
if (cron == null) { protected AbstractCycle(Cron cron) {
throw new IllegalArgumentException("cron must not be null!"); if (cron == null) {
throw new IllegalArgumentException("cron must not be null!");
}
this.cron = cron;
this.minField = cron.retrieve(CronFieldName.MINUTE);
this.hourField = cron.retrieve(CronFieldName.HOUR);
this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH);
this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK);
this.monthField = cron.retrieve(CronFieldName.MONTH);
this.yearField = cron.retrieve(CronFieldName.YEAR);
}
/**
* whether the minute field has a value
*
* @return if minute field has a value return true,else return false
*/
protected boolean minFiledIsSetAll() {
FieldExpression minFieldExpression = minField.getExpression();
return (minFieldExpression instanceof Every || minFieldExpression instanceof Always
|| minFieldExpression instanceof Between || minFieldExpression instanceof And
|| minFieldExpression instanceof On);
}
/**
* whether the minute field has a value of every or always
*
* @return if minute field has a value of every or always return true,else return false
*/
protected boolean minFiledIsEvery() {
FieldExpression minFieldExpression = minField.getExpression();
return (minFieldExpression instanceof Every || minFieldExpression instanceof Always);
}
/**
* whether the hour field has a value
*
* @return if hour field has a value return true,else return false
*/
protected boolean hourFiledIsSetAll() {
FieldExpression hourFieldExpression = hourField.getExpression();
return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always
|| hourFieldExpression instanceof Between || hourFieldExpression instanceof And
|| hourFieldExpression instanceof On);
}
/**
* whether the hour field has a value of every or always
*
* @return if hour field has a value of every or always return true,else return false
*/
protected boolean hourFiledIsEvery() {
FieldExpression hourFieldExpression = hourField.getExpression();
return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always);
}
/**
* whether the day Of month field has a value
*
* @return if day Of month field has a value return true,else return false
*/
protected boolean dayOfMonthFieldIsSetAll() {
return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always
|| dayOfMonthField.getExpression() instanceof Between || dayOfMonthField.getExpression() instanceof And
|| dayOfMonthField.getExpression() instanceof On);
}
/**
* whether the day Of Month field has a value of every or always
*
* @return if day Of Month field has a value of every or always return true,else return false
*/
protected boolean dayOfMonthFieldIsEvery() {
return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always);
}
/**
* whether month field has a value
*
* @return if month field has a value return true,else return false
*/
protected boolean monthFieldIsSetAll() {
FieldExpression monthFieldExpression = monthField.getExpression();
return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always
|| monthFieldExpression instanceof Between || monthFieldExpression instanceof And
|| monthFieldExpression instanceof On);
}
/**
* whether the month field has a value of every or always
*
* @return if month field has a value of every or always return true,else return false
*/
protected boolean monthFieldIsEvery() {
FieldExpression monthFieldExpression = monthField.getExpression();
return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always);
}
/**
* whether the day Of week field has a value
*
* @return if day Of week field has a value return true,else return false
*/
protected boolean dayofWeekFieldIsSetAll() {
FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always
|| dayOfWeekFieldExpression instanceof Between || dayOfWeekFieldExpression instanceof And
|| dayOfWeekFieldExpression instanceof On);
}
/**
* whether the day Of week field has a value of every or always
*
* @return if day Of week field has a value of every or always return true,else return false
*/
protected boolean dayofWeekFieldIsEvery() {
FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always);
} }
this.cron = cron; /**
this.minField = cron.retrieve(CronFieldName.MINUTE); * get cycle enum
this.hourField = cron.retrieve(CronFieldName.HOUR); *
this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH); * @return CycleEnum
this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK); */
this.monthField = cron.retrieve(CronFieldName.MONTH); protected abstract CycleEnum getCycle();
this.yearField = cron.retrieve(CronFieldName.YEAR);
} /**
* get mini level cycle enum
/** *
* whether the minute field has a value * @return CycleEnum
* @return if minute field has a value return true,else return false */
*/ protected abstract CycleEnum getMiniCycle();
protected boolean minFiledIsSetAll(){
FieldExpression minFieldExpression = minField.getExpression();
return (minFieldExpression instanceof Every || minFieldExpression instanceof Always
|| minFieldExpression instanceof Between || minFieldExpression instanceof And
|| minFieldExpression instanceof On);
}
/**
* whether the minute field has a value of every or always
* @return if minute field has a value of every or always return true,else return false
*/
protected boolean minFiledIsEvery(){
FieldExpression minFieldExpression = minField.getExpression();
return (minFieldExpression instanceof Every || minFieldExpression instanceof Always);
}
/**
* whether the hour field has a value
* @return if hour field has a value return true,else return false
*/
protected boolean hourFiledIsSetAll(){
FieldExpression hourFieldExpression = hourField.getExpression();
return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always
|| hourFieldExpression instanceof Between || hourFieldExpression instanceof And
|| hourFieldExpression instanceof On);
}
/**
* whether the hour field has a value of every or always
* @return if hour field has a value of every or always return true,else return false
*/
protected boolean hourFiledIsEvery(){
FieldExpression hourFieldExpression = hourField.getExpression();
return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always);
}
/**
* whether the day Of month field has a value
* @return if day Of month field has a value return true,else return false
*/
protected boolean dayOfMonthFieldIsSetAll(){
return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always
|| dayOfMonthField.getExpression() instanceof Between || dayOfMonthField.getExpression() instanceof And
|| dayOfMonthField.getExpression() instanceof On);
}
/**
* whether the day Of Month field has a value of every or always
* @return if day Of Month field has a value of every or always return true,else return false
*/
protected boolean dayOfMonthFieldIsEvery(){
return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always);
}
/**
* whether month field has a value
* @return if month field has a value return true,else return false
*/
protected boolean monthFieldIsSetAll(){
FieldExpression monthFieldExpression = monthField.getExpression();
return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always
|| monthFieldExpression instanceof Between || monthFieldExpression instanceof And
|| monthFieldExpression instanceof On);
}
/**
* whether the month field has a value of every or always
* @return if month field has a value of every or always return true,else return false
*/
protected boolean monthFieldIsEvery(){
FieldExpression monthFieldExpression = monthField.getExpression();
return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always);
}
/**
* whether the day Of week field has a value
* @return if day Of week field has a value return true,else return false
*/
protected boolean dayofWeekFieldIsSetAll(){
FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always
|| dayOfWeekFieldExpression instanceof Between || dayOfWeekFieldExpression instanceof And
|| dayOfWeekFieldExpression instanceof On);
}
/**
* whether the day Of week field has a value of every or always
* @return if day Of week field has a value of every or always return true,else return false
*/
protected boolean dayofWeekFieldIsEvery(){
FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always);
}
/**
* get cycle enum
* @return CycleEnum
*/
protected abstract CycleEnum getCycle();
/**
* get mini level cycle enum
* @return CycleEnum
*/
protected abstract CycleEnum getMiniCycle();
} }
...@@ -14,322 +14,329 @@ ...@@ -14,322 +14,329 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.zk; package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.recipes.locks.InterProcessMutex; import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* abstract zookeeper client * abstract zookeeper client
*/ */
@Component @Component
public abstract class AbstractZKClient extends ZookeeperCachedOperator { public abstract class AbstractZKClient extends ZookeeperCachedOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/**
/** * remove dead server by host
* remove dead server by host *
* @param host host * @param host host
* @param serverType serverType * @param serverType serverType
* @throws Exception */
*/ public void removeDeadServerByHost(String host, String serverType) {
public void removeDeadServerByHost(String host, String serverType) throws Exception { List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); for (String serverPath : deadServers) {
for(String serverPath : deadServers){ if (serverPath.startsWith(serverType + UNDERLINE + host)) {
if(serverPath.startsWith(serverType+UNDERLINE+host)){ String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; super.remove(server);
super.remove(server); logger.info("{} server {} deleted from zk dead server path success", serverType, host);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host); }
} }
} }
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
/** * opType(delete): delete path from zk
* opType(add): if find dead server , then add to zk deadServerPath *
* opType(delete): delete path from zk * @param zNode node path
* * @param zkNodeType master or worker
* @param zNode node path * @param opType delete or add
* @param zkNodeType master or worker */
* @param opType delete or add public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) {
* @throws Exception errors String host = getHostByEventDataPath(zNode);
*/ String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
String host = getHostByEventDataPath(zNode); //check server restart, if restart , dead server path in zk should be delete
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; if (opType.equals(DELETE_ZK_OP)) {
removeDeadServerByHost(host, type);
//check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){ } else if (opType.equals(ADD_ZK_OP)) {
removeDeadServerByHost(host, type); String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if (!super.isExisted(deadServerPath)) {
}else if(opType.equals(ADD_ZK_OP)){ //add dead server info to zk dead server path : /dead-servers/
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if(!super.isExisted(deadServerPath)){ super.persist(deadServerPath, (type + UNDERLINE + host));
//add dead server info to zk dead server path : /dead-servers/
logger.info("{} server dead , and {} added to zk dead server path success",
super.persist(deadServerPath,(type + UNDERLINE + host)); zkNodeType, zNode);
}
logger.info("{} server dead , and {} added to zk dead server path success" , }
zkNodeType.toString(), zNode);
} }
}
/**
} * get active master num
*
/** * @return active master number
* get active master num */
* @return active master number public int getActiveMasterNum() {
*/ List<String> childrenList = new ArrayList<>();
public int getActiveMasterNum(){ try {
List<String> childrenList = new ArrayList<>(); // read master node parent path from conf
try { if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
// read master node parent path from conf childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){ }
childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER)); } catch (Exception e) {
} logger.error("getActiveMasterNum error", e);
} catch (Exception e) { }
logger.error("getActiveMasterNum error",e); return childrenList.size();
} }
return childrenList.size();
} /**
* @return zookeeper quorum
/** */
* public String getZookeeperQuorum() {
* @return zookeeper quorum return getZookeeperConfig().getServerList();
*/ }
public String getZookeeperQuorum(){
return getZookeeperConfig().getServerList(); /**
} * get server list.
*
/** * @param zkNodeType zookeeper node type
* get server list. * @return server list
* @param zkNodeType zookeeper node type */
* @return server list public List<Server> getServersList(ZKNodeType zkNodeType) {
*/ Map<String, String> masterMap = getServerMaps(zkNodeType);
public List<Server> getServersList(ZKNodeType zkNodeType){ String parentPath = getZNodeParentPath(zkNodeType);
Map<String, String> masterMap = getServerMaps(zkNodeType);
String parentPath = getZNodeParentPath(zkNodeType); List<Server> masterServers = new ArrayList<>();
for (Map.Entry<String, String> entry : masterMap.entrySet()) {
List<Server> masterServers = new ArrayList<>(); Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
for (Map.Entry<String, String> entry : masterMap.entrySet()) { if (masterServer == null) {
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); continue;
if(masterServer == null){ }
continue; String key = entry.getKey();
} masterServer.setZkDirectory(parentPath + "/" + key);
String key = entry.getKey(); //set host and port
masterServer.setZkDirectory(parentPath + "/"+ key); String[] hostAndPort = key.split(COLON);
//set host and port String[] hosts = hostAndPort[0].split(DIVISION_STRING);
String[] hostAndPort=key.split(COLON); // fetch the last one
String[] hosts=hostAndPort[0].split(DIVISION_STRING); masterServer.setHost(hosts[hosts.length - 1]);
// fetch the last one masterServer.setPort(Integer.parseInt(hostAndPort[1]));
masterServer.setHost(hosts[hosts.length-1]); masterServers.add(masterServer);
masterServer.setPort(Integer.parseInt(hostAndPort[1])); }
masterServers.add(masterServer); return masterServers;
} }
return masterServers;
} /**
* get master server list map.
/** *
* get master server list map. * @param zkNodeType zookeeper node type
* @param zkNodeType zookeeper node type * @return result : {host : resource info}
* @return result : {host : resource info} */
*/ public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
Map<String, String> masterMap = new HashMap<>();
Map<String, String> masterMap = new HashMap<>(); try {
try { String path = getZNodeParentPath(zkNodeType);
String path = getZNodeParentPath(zkNodeType); List<String> serverList = super.getChildrenKeys(path);
List<String> serverList = super.getChildrenKeys(path); if (zkNodeType == ZKNodeType.WORKER) {
if(zkNodeType == ZKNodeType.WORKER){ List<String> workerList = new ArrayList<>();
List<String> workerList = new ArrayList<>(); for (String group : serverList) {
for(String group : serverList){ List<String> groupServers = super.getChildrenKeys(path + Constants.SLASH + group);
List<String> groupServers = super.getChildrenKeys(path + Constants.SLASH + group); for (String groupServer : groupServers) {
for(String groupServer : groupServers){ workerList.add(group + Constants.SLASH + groupServer);
workerList.add(group + Constants.SLASH + groupServer); }
} }
} serverList = workerList;
serverList = workerList; }
} for (String server : serverList) {
for(String server : serverList){ masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server));
masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server)); }
} } catch (Exception e) {
} catch (Exception e) { logger.error("get server list failed", e);
logger.error("get server list failed", e); }
}
return masterMap;
return masterMap; }
}
/**
/** * check the zookeeper node already exists
* check the zookeeper node already exists *
* @param host host * @param host host
* @param zkNodeType zookeeper node type * @param zkNodeType zookeeper node type
* @return true if exists * @return true if exists
*/ */
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) { public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
String path = getZNodeParentPath(zkNodeType); String path = getZNodeParentPath(zkNodeType);
if(StringUtils.isEmpty(path)){ if (StringUtils.isEmpty(path)) {
logger.error("check zk node exists error, host:{}, zk node type:{}", logger.error("check zk node exists error, host:{}, zk node type:{}",
host, zkNodeType.toString()); host, zkNodeType);
return false; return false;
} }
Map<String, String> serverMaps = getServerMaps(zkNodeType); Map<String, String> serverMaps = getServerMaps(zkNodeType);
for(String hostKey : serverMaps.keySet()){ for (String hostKey : serverMaps.keySet()) {
if(hostKey.contains(host)){ if (hostKey.contains(host)) {
return true; return true;
} }
} }
return false; return false;
} }
/** /**
* * @return get worker node parent path
* @return get worker node parent path */
*/ protected String getWorkerZNodeParentPath() {
protected String getWorkerZNodeParentPath(){ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; }
}
/**
/** * @return get master node parent path
* */
* @return get master node parent path protected String getMasterZNodeParentPath() {
*/ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
protected String getMasterZNodeParentPath(){ }
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
} /**
* @return get master lock path
/** */
* public String getMasterLockPath() {
* @return get master lock path return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
*/ }
public String getMasterLockPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; /**
} * @param zkNodeType zookeeper node type
* @return get zookeeper node parent path
/** */
* public String getZNodeParentPath(ZKNodeType zkNodeType) {
* @param zkNodeType zookeeper node type String path = "";
* @return get zookeeper node parent path switch (zkNodeType) {
*/ case MASTER:
public String getZNodeParentPath(ZKNodeType zkNodeType) { return getMasterZNodeParentPath();
String path = ""; case WORKER:
switch (zkNodeType){ return getWorkerZNodeParentPath();
case MASTER: case DEAD_SERVER:
return getMasterZNodeParentPath(); return getDeadZNodeParentPath();
case WORKER: default:
return getWorkerZNodeParentPath(); break;
case DEAD_SERVER: }
return getDeadZNodeParentPath(); return path;
default: }
break;
} /**
return path; * @return get dead server node parent path
} */
protected String getDeadZNodeParentPath() {
/** return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
* }
* @return get dead server node parent path
*/ /**
protected String getDeadZNodeParentPath(){ * @return get master start up lock path
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; */
} public String getMasterStartUpLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
/** }
*
* @return get master start up lock path /**
*/ * @return get master failover lock path
public String getMasterStartUpLockPath(){ */
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; public String getMasterFailoverLockPath() {
} return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
}
/**
* /**
* @return get master failover lock path * @return get worker failover lock path
*/ */
public String getMasterFailoverLockPath(){ public String getWorkerFailoverLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
} }
/** /**
* * release mutex
* @return get worker failover lock path *
*/ * @param mutex mutex
public String getWorkerFailoverLockPath(){ */
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; public void releaseMutex(InterProcessMutex mutex) {
} if (mutex != null) {
try {
/** mutex.release();
* release mutex } catch (Exception e) {
* @param mutex mutex if ("instance must be started before calling this method".equals(e.getMessage())) {
*/ logger.warn("lock release");
public void releaseMutex(InterProcessMutex mutex) { } else {
if (mutex != null){ logger.error("lock release failed", e);
try { }
mutex.release();
} catch (Exception e) { }
if("instance must be started before calling this method".equals(e.getMessage())){ }
logger.warn("lock release"); }
}else{
logger.error("lock release failed",e); /**
} * init system znode
*/
} protected void initSystemZNode() {
} try {
} persist(getMasterZNodeParentPath(), "");
persist(getWorkerZNodeParentPath(), "");
/** persist(getDeadZNodeParentPath(), "");
* init system znode
*/ logger.info("initialize server nodes success.");
protected void initSystemZNode(){ } catch (Exception e) {
try { logger.error("init system znode failed", e);
persist(getMasterZNodeParentPath(), ""); }
persist(getWorkerZNodeParentPath(), ""); }
persist(getDeadZNodeParentPath(), "");
/**
logger.info("initialize server nodes success."); * get host ip, string format: masterParentPath/ip
} catch (Exception e) { *
logger.error("init system znode failed",e); * @param path path
} * @return host ip, string format: masterParentPath/ip
} */
protected String getHostByEventDataPath(String path) {
/** if (StringUtils.isEmpty(path)) {
* get host ip, string format: masterParentPath/ip logger.error("empty path!");
* @param path path return "";
* @return host ip, string format: masterParentPath/ip }
*/ String[] pathArray = path.split(SINGLE_SLASH);
protected String getHostByEventDataPath(String path) { if (pathArray.length < 1) {
if(StringUtils.isEmpty(path)){ logger.error("parse ip error: {}", path);
logger.error("empty path!"); return "";
return ""; }
} return pathArray[pathArray.length - 1];
String[] pathArray = path.split(SINGLE_SLASH);
if(pathArray.length < 1){ }
logger.error("parse ip error: {}", path);
return ""; @Override
} public String toString() {
return pathArray[pathArray.length - 1]; return "AbstractZKClient{"
+ "zkClient=" + getZkClient()
} + ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+ ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\''
@Override + ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\''
public String toString() { + '}';
return "AbstractZKClient{" + }
"zkClient=" + getZkClient() + }
", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
'}';
}
}
\ No newline at end of file
...@@ -14,9 +14,14 @@ ...@@ -14,9 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.zk; package org.apache.dolphinscheduler.service.zk;
import org.apache.commons.lang.StringUtils; import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.ACLProvider;
...@@ -25,18 +30,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry; ...@@ -25,18 +30,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
/** /**
* Shared Curator zookeeper client * Shared Curator zookeeper client
*/ */
...@@ -49,7 +52,6 @@ public class CuratorZookeeperClient implements InitializingBean { ...@@ -49,7 +52,6 @@ public class CuratorZookeeperClient implements InitializingBean {
private CuratorFramework zkClient; private CuratorFramework zkClient;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient(); this.zkClient = buildClient();
...@@ -91,7 +93,7 @@ public class CuratorZookeeperClient implements InitializingBean { ...@@ -91,7 +93,7 @@ public class CuratorZookeeperClient implements InitializingBean {
zkClient.blockUntilConnected(30, TimeUnit.SECONDS); zkClient.blockUntilConnected(30, TimeUnit.SECONDS);
} catch (final Exception ex) { } catch (final Exception ex) {
throw new RuntimeException(ex); throw new ServiceException(ex);
} }
return zkClient; return zkClient;
} }
...@@ -123,4 +125,4 @@ public class CuratorZookeeperClient implements InitializingBean { ...@@ -123,4 +125,4 @@ public class CuratorZookeeperClient implements InitializingBean {
public CuratorFramework getZkClient() { public CuratorFramework getZkClient() {
return zkClient; return zkClient;
} }
} }
\ No newline at end of file
...@@ -14,19 +14,22 @@ ...@@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.zk; package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* just speed experience version * just speed experience version
...@@ -51,10 +54,10 @@ public class ZKServer { ...@@ -51,10 +54,10 @@ public class ZKServer {
ZKServer zkServer; ZKServer zkServer;
if (args.length == 0) { if (args.length == 0) {
zkServer = new ZKServer(); zkServer = new ZKServer();
} else if (args.length == 1){ } else if (args.length == 1) {
zkServer = new ZKServer(Integer.valueOf(args[0]), ""); zkServer = new ZKServer(Integer.parseInt(args[0]), "");
} else { } else {
zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]); zkServer = new ZKServer(Integer.parseInt(args[0]), args[1]);
} }
zkServer.registerHook(); zkServer.registerHook();
zkServer.start(); zkServer.start();
...@@ -73,7 +76,7 @@ public class ZKServer { ...@@ -73,7 +76,7 @@ public class ZKServer {
} }
private void registerHook() { private void registerHook() {
/** /*
* register hooks, which are called before the process exits * register hooks, which are called before the process exits
*/ */
Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
...@@ -90,7 +93,7 @@ public class ZKServer { ...@@ -90,7 +93,7 @@ public class ZKServer {
} }
} }
public boolean isStarted(){ public boolean isStarted() {
return isStarted.get(); return isStarted.get();
} }
...@@ -119,19 +122,19 @@ public class ZKServer { ...@@ -119,19 +122,19 @@ public class ZKServer {
if (file.exists()) { if (file.exists()) {
logger.warn("The path of zk server exists"); logger.warn("The path of zk server exists");
} }
logger.info("zk server starting, data dir path:{}" , zkDataDir); logger.info("zk server starting, data dir path:{}", zkDataDir);
startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60"); startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME, "60");
} }
/** /**
* Starts a local Zk instance * Starts a local Zk instance
* *
* @param port The port to listen on * @param port The port to listen on
* @param dataDirPath The path for the Zk data directory * @param dataDirPath The path for the Zk data directory
* @param tickTime zk tick time * @param tickTime zk tick time
* @param maxClientCnxns zk max client connections * @param maxClientCnxns zk max client connections
*/ */
private void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) { private void startLocalZkServer(final int port, final String dataDirPath, final int tickTime, String maxClientCnxns) {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
zooKeeperServerMain = new PublicZooKeeperServerMain(); zooKeeperServerMain = new PublicZooKeeperServerMain();
logger.info("Zookeeper data path : {} ", dataDirPath); logger.info("Zookeeper data path : {} ", dataDirPath);
...@@ -144,8 +147,7 @@ public class ZKServer { ...@@ -144,8 +147,7 @@ public class ZKServer {
zooKeeperServerMain.initializeAndRun(args); zooKeeperServerMain.initializeAndRun(args);
} catch (QuorumPeerConfig.ConfigException | IOException e) { } catch (QuorumPeerConfig.ConfigException | IOException e) {
logger.warn("Caught exception while starting ZK", e); throw new ServiceException("Caught exception while starting ZK", e);
throw new RuntimeException(e);
} }
} }
} }
...@@ -159,7 +161,7 @@ public class ZKServer { ...@@ -159,7 +161,7 @@ public class ZKServer {
logger.info("zk server stopped"); logger.info("zk server stopped");
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to stop ZK ",e); logger.error("Failed to stop ZK ", e);
} }
} }
...@@ -180,8 +182,7 @@ public class ZKServer { ...@@ -180,8 +182,7 @@ public class ZKServer {
org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir)); org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir));
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("Caught exception while stopping ZK server", e); throw new ServiceException("Caught exception while starting ZK", e);
throw new RuntimeException(e);
} }
} }
} }
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册