提交 e0e0696b 编写于 作者: B bao liang 提交者: qiaozhanwei

add out put log when master/worker server start (#1769)

* merge hadoop.properties into common.properties

* merge hadoop,zookeeper.properties into common.properties
remove combined.properties/master.properties/worker.properties

* change db user/pwd to test/test

* rename .dolphinscheduler_en.sh to dolphinscheduler_env.sh
remove some unused in install.sh

* add out put log when master/worker server start...

* add start log when servers start
上级 7dfa9976
......@@ -49,7 +49,7 @@ login.user.keytab.username=hdfs-mycluster@ESZ.COM
login.user.keytab.path=/opt/hdfs.headless.keytab
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
dolphinscheduler.env.path=/opt/dolphinscheduler/conf/env/.dolphinscheduler_env.sh
dolphinscheduler.env.path=/opt/dolphinscheduler/conf/env/dolphinscheduler_env.sh
#resource.view.suffixs
resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml
......
......@@ -471,6 +471,8 @@ public final class Constants {
public static final String PASSWORD = "password";
public static final String XXXXXX = "******";
public static final String NULL = "NULL";
public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd";
......
......@@ -108,9 +108,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
}
}
}
......@@ -412,6 +412,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
persist(getWorkerZNodeParentPath(), "");
persist(getDeadZNodeParentPath(), "");
logger.info("initialize server nodes success.");
} catch (Exception e) {
logger.error("init system znode failed",e);
}
......
......@@ -63,7 +63,7 @@ login.user.keytab.username=hdfs-mycluster@ESZ.COM
login.user.keytab.path=/opt/hdfs.headless.keytab
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
dolphinscheduler.env.path=/opt/.dolphinscheduler_env.sh
dolphinscheduler.env.path=/opt/dolphinscheduler_env.sh
#resource.view.suffixs
resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties
......
......@@ -95,6 +95,45 @@ mybatis-plus.configuration.cache-enabled=false
mybatis-plus.configuration.call-setters-on-nulls=true
mybatis-plus.configuration.jdbc-type-for-null=null
# master settings
# master execute thread num
master.exec.threads=100
# master execute task number in parallel
master.exec.task.num=20
# master heartbeat interval
master.heartbeat.interval=10
# master commit task retry times
master.task.commit.retryTimes=5
# master commit task interval
master.task.commit.interval=1000
# only less than cpu avg load, master server can work. default value : the number of cpu cores * 2
master.max.cpuload.avg=100
# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G.
master.reserved.memory=0.1
# worker settings
# worker execute thread num
worker.exec.threads=100
# worker heartbeat interval
worker.heartbeat.interval=10
# submit the number of tasks at a time
worker.fetch.task.num = 3
# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
worker.max.cpuload.avg=100
# only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G.
worker.reserved.memory=0.1
# data quality analysis is not currently in use. please ignore the following configuration
# task record
task.record.flag=false
......
......@@ -97,7 +97,9 @@ public class MasterServer implements IStoppable {
* @param args arguments
*/
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
}
/**
......@@ -133,6 +135,7 @@ public class MasterServer implements IStoppable {
// start QuartzExecutors
// what system should do if exception
try {
logger.info("start Quartz server...");
ProcessScheduleJob.init(processDao);
QuartzExecutors.getInstance().start();
} catch (Exception e) {
......@@ -239,6 +242,7 @@ public class MasterServer implements IStoppable {
* @return
*/
private Runnable heartBeatThread(){
logger.info("start master heart beat thread...");
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
......
......@@ -22,25 +22,25 @@ import org.springframework.stereotype.Component;
@Component
public class MasterConfig {
@Value("${master.exec.threads:100}")
@Value("${master.exec.threads}")
private int masterExecThreads;
@Value("${master.exec.task.num:20}")
@Value("${master.exec.task.num}")
private int masterExecTaskNum;
@Value("${master.heartbeat.interval:10}")
@Value("${master.heartbeat.interval}")
private int masterHeartbeatInterval;
@Value("${master.task.commit.retryTimes:5}")
@Value("${master.task.commit.retryTimes}")
private int masterTaskCommitRetryTimes;
@Value("${master.task.commit.interval:100}")
@Value("${master.task.commit.interval}")
private int masterTaskCommitInterval;
@Value("${master.max.cpuload.avg:100}")
@Value("${master.max.cpuload.avg}")
private double masterMaxCpuloadAvg;
@Value("${master.reserved.memory:0.1}")
@Value("${master.reserved.memory}")
private double masterReservedMemory;
public int getMasterExecThreads() {
......
......@@ -37,7 +37,7 @@ public class MasterLogFilter extends Filter<ILoggingEvent> {
*/
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("Master-")){
if (event.getThreadName().startsWith("Master-") ){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
......
......@@ -90,6 +90,7 @@ public class MasterSchedulerThread implements Runnable {
*/
@Override
public void run() {
logger.info("master scheduler start successfully...");
while (Stopper.isRunning()){
// process instance
......@@ -139,6 +140,7 @@ public class MasterSchedulerThread implements Runnable {
AbstractZKClient.releaseMutex(mutex);
}
}
logger.info("master server stopped...");
}
......
......@@ -24,7 +24,7 @@ import org.springframework.stereotype.Component;
* zookeeper conf
*/
@Component
@PropertySource("classpath:config/run_config.conf")
@PropertySource("classpath:config/install_config.conf")
public class RunConfig {
//zk connect config
......
......@@ -129,6 +129,7 @@ public class WorkerServer implements IStoppable {
* @param args arguments
*/
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
}
......@@ -138,6 +139,8 @@ public class WorkerServer implements IStoppable {
*/
@PostConstruct
public void run(){
logger.info("start worker server...");
zkWorkerClient.init();
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
......@@ -264,6 +267,7 @@ public class WorkerServer implements IStoppable {
* @return
*/
private Runnable heartBeatThread(){
logger.info("start worker heart beat thread...");
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
......@@ -288,6 +292,7 @@ public class WorkerServer implements IStoppable {
Runnable killProcessThread = new Runnable() {
@Override
public void run() {
logger.info("start listening kill process thread...");
while (Stopper.isRunning()){
Set<String> taskInfoSet = taskQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_KILL);
if (CollectionUtils.isNotEmpty(taskInfoSet)){
......
......@@ -22,19 +22,19 @@ import org.springframework.stereotype.Component;
@Component
public class WorkerConfig {
@Value("${worker.exec.threads:100}")
@Value("${worker.exec.threads}")
private int workerExecThreads;
@Value("${worker.heartbeat.interval:10}")
@Value("${worker.heartbeat.interval}")
private int workerHeartbeatInterval;
@Value("${worker.fetch.task.num:3}")
@Value("${worker.fetch.task.num}")
private int workerFetchTaskNum;
@Value("${worker.max.cpuload.avg:10}")
@Value("${worker.max.cpuload.avg}")
private int workerMaxCpuloadAvg;
@Value("${master.reserved.memory:1}")
@Value("${master.reserved.memory}")
private double workerReservedMemory;
public int getWorkerExecThreads() {
......
......@@ -136,6 +136,7 @@ public class FetchTaskThread implements Runnable{
@Override
public void run() {
logger.info("worker start fetch tasks...");
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
......
......@@ -84,6 +84,9 @@ public class ZKMasterClient extends AbstractZKClient {
* init
*/
public void init(){
logger.info("initialize master client...");
// init dao
this.initDao();
......
......@@ -51,6 +51,7 @@ public class ZKWorkerClient extends AbstractZKClient {
*/
public void init(){
logger.info("initialize worker client...");
// init system znode
this.initSystemZNode();
......
......@@ -19,3 +19,7 @@ installPath=/data1_1T/dolphinscheduler
deployUser=dolphinscheduler
ips=ark0,ark1,ark2,ark3,ark4
sshPort=22
masters=ark0,ark1
workers=ark2,ark3,ark4
alertServer=ark3
apiServers=ark1
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
masters=ark0,ark1
workers=ark2,ark3,ark4
alertServer=ark3
apiServers=ark1
sshPort=22
\ No newline at end of file
......@@ -33,7 +33,7 @@ public class EnvFileTest {
@Test
public void test() {
String path = System.getProperty("user.dir")+"/script/env/.dolphinscheduler_env.sh";
String path = System.getProperty("user.dir")+"/script/env/dolphinscheduler_env.sh";
String pythonHome = getPythonHome(path);
logger.info(pythonHome);
}
......
......@@ -47,7 +47,6 @@ else
exit 1
fi
source ${workDir}/conf/config/run_config.conf
source ${workDir}/conf/config/install_config.conf
# for example postgresql or mysql ...
......@@ -86,7 +85,6 @@ ips="ark0,ark1,ark2,ark3,ark4"
# Note: if ssh port is not default, modify here
sshPort=22
# conf/config/run_config.conf config
# run master machine
# Note: list of hosts hostname for deploying master
masters="ark0,ark1"
......@@ -147,9 +145,6 @@ enterpriseWechatAgentId="xxxxxxxxxx"
# Enterprise WeChat user configuration, multiple users to , split
enterpriseWechatUsers="xxxxx,xxxxx"
# alert port
alertPort=7789
# whether to start monitoring self-starting scripts
monitorServerState="false"
......@@ -192,7 +187,7 @@ downloadPath="/tmp/dolphinscheduler/download"
execPath="/tmp/dolphinscheduler/exec"
# SHELL environmental variable path
shellEnvPath="$installPath/conf/env/.dolphinscheduler_env.sh"
shellEnvPath="$installPath/conf/env/dolphinscheduler_env.sh"
# suffix of the resource file
resSuffixs="txt,log,sh,conf,cfg,py,java,sql,hql,xml"
......@@ -225,7 +220,7 @@ zkSessionTimeout="300"
zkConnectionTimeout="300"
# zk retry interval
zkRetrySleep="100"
zkRetryMaxSleep="100"
# zk retry maximum number of times
zkRetryMaxtime="5"
......@@ -245,19 +240,15 @@ masterHeartbeatInterval="10"
masterTaskCommitRetryTimes="5"
# master task submission retry interval
masterTaskCommitInterval="100"
masterTaskCommitInterval="1000"
# master maximum cpu average load, used to determine whether the master has execution capability
masterMaxCpuLoadAvg="10"
masterMaxCpuLoadAvg="100"
# master reserve memory to determine if the master has execution capability
masterReservedMemory="1"
# master port
masterPort=5566
masterReservedMemory="0.1"
# worker config
# worker config
# worker execution thread
workerExecThreads="100"
......@@ -268,11 +259,7 @@ workerHeartbeatInterval="10"
workerFetchTaskNum="3"
# worker reserve memory to determine if the master has execution capability
workerReservedMemory="1"
# master port
workerPort=7788
workerReservedMemory="0.1"
# api config
# api server port
......@@ -322,7 +309,17 @@ if [ $dbtype == "postgresql" ];then
sed -i ${txt} "s#org.quartz.jobStore.driverDelegateClass.*#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate#g" conf/quartz.properties
fi
sed -i ${txt} "s#master.exec.threads.*#master.exec.threads=${masterExecThreads}#g" conf/application.properties
sed -i ${txt} "s#master.exec.task.num.*#master.exec.task.num=${masterExecTaskNum}#g" conf/application.properties
sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/application.properties
sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/application.properties
sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/application.properties
sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/application.properties
sed -i ${txt} "s#worker.exec.threads.*#worker.exec.threads=${workerExecThreads}#g" conf/application.properties
sed -i ${txt} "s#worker.heartbeat.interval.*#worker.heartbeat.interval=${workerHeartbeatInterval}#g" conf/application.properties
sed -i ${txt} "s#worker.fetch.task.num.*#worker.fetch.task.num=${workerFetchTaskNum}#g" conf/application.properties
sed -i ${txt} "s#worker.reserved.memory.*#worker.reserved.memory=${workerReservedMemory}#g" conf/application.properties
sed -i ${txt} "s#fs.defaultFS.*#fs.defaultFS=${defaultFS}#g" conf/common.properties
sed -i ${txt} "s#fs.s3a.endpoint.*#fs.s3a.endpoint=${s3Endpoint}#g" conf/common.properties
......@@ -331,7 +328,6 @@ sed -i ${txt} "s#fs.s3a.secret.key.*#fs.s3a.secret.key=${s3SecretKey}#g" conf/co
sed -i ${txt} "s#yarn.resourcemanager.ha.rm.ids.*#yarn.resourcemanager.ha.rm.ids=${yarnHaIps}#g" conf/common.properties
sed -i ${txt} "s#yarn.application.status.address.*#yarn.application.status.address=http://${singleYarnIp}:8088/ws/v1/cluster/apps/%s#g" conf/common.properties
sed -i ${txt} "s#data.basedir.path.*#data.basedir.path=${programPath}#g" conf/common.properties
sed -i ${txt} "s#data.download.basedir.path.*#data.download.basedir.path=${downloadPath}#g" conf/common.properties
sed -i ${txt} "s#process.exec.basepath.*#process.exec.basepath=${execPath}#g" conf/common.properties
......@@ -350,7 +346,7 @@ sed -i ${txt} "s#zookeeper.quorum.*#zookeeper.quorum=${zkQuorum}#g" conf/common.
sed -i ${txt} "s#zookeeper.dolphinscheduler.root.*#zookeeper.dolphinscheduler.root=${zkRoot}#g" conf/common.properties
sed -i ${txt} "s#zookeeper.session.timeout.*#zookeeper.session.timeout=${zkSessionTimeout}#g" conf/common.properties
sed -i ${txt} "s#zookeeper.connection.timeout.*#zookeeper.connection.timeout=${zkConnectionTimeout}#g" conf/common.properties
sed -i ${txt} "s#zookeeper.retry.sleep.*#zookeeper.retry.sleep=${zkRetrySleep}#g" conf/common.properties
sed -i ${txt} "s#zookeeper.retry.max.sleep.*#zookeeper.retry.max.sleep=${zkRetryMaxSleep}#g" conf/common.properties
sed -i ${txt} "s#zookeeper.retry.maxtime.*#zookeeper.retry.maxtime=${zkRetryMaxtime}#g" conf/common.properties
sed -i ${txt} "s#server.port.*#server.port=${apiServerPort}#g" conf/application-api.properties
......@@ -384,11 +380,11 @@ sed -i ${txt} "s#ips.*#ips=${ips}#g" conf/config/install_config.conf
sed -i ${txt} "s#sshPort.*#sshPort=${sshPort}#g" conf/config/install_config.conf
sed -i ${txt} "s#masters.*#masters=${masters}#g" conf/config/run_config.conf
sed -i ${txt} "s#workers.*#workers=${workers}#g" conf/config/run_config.conf
sed -i ${txt} "s#alertServer.*#alertServer=${alertServer}#g" conf/config/run_config.conf
sed -i ${txt} "s#apiServers.*#apiServers=${apiServers}#g" conf/config/run_config.conf
sed -i ${txt} "s#sshPort.*#sshPort=${sshPort}#g" conf/config/run_config.conf
sed -i ${txt} "s#masters.*#masters=${masters}#g" conf/config/install_config.conf
sed -i ${txt} "s#workers.*#workers=${workers}#g" conf/config/install_config.conf
sed -i ${txt} "s#alertServer.*#alertServer=${alertServer}#g" conf/config/install_config.conf
sed -i ${txt} "s#apiServers.*#apiServers=${apiServers}#g" conf/config/install_config.conf
sed -i ${txt} "s#sshPort.*#sshPort=${sshPort}#g" conf/config/install_config.conf
# 2,create directory
......
......@@ -18,7 +18,6 @@
workDir=`dirname $0`
workDir=`cd ${workDir};pwd`
source $workDir/../conf/config/run_config.conf
source $workDir/../conf/config/install_config.conf
hostsArr=(${ips//,/ })
......
......@@ -18,7 +18,6 @@
workDir=`dirname $0`
workDir=`cd ${workDir};pwd`
source $workDir/../conf/config/run_config.conf
source $workDir/../conf/config/install_config.conf
mastersHost=(${masters//,/ })
......
......@@ -19,7 +19,6 @@
workDir=`dirname $0`
workDir=`cd ${workDir};pwd`
source $workDir/../conf/config/run_config.conf
source $workDir/../conf/config/install_config.conf
mastersHost=(${masters//,/ })
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册