提交 18e8703b 编写于 作者: Q qiaozhanwei 提交者: lgcareer

remove kazoo (#1575)

* remove LogViewServiceGrpc.java file and pom modify

* remove kazoo

* remove kazoo

* remove kazoo

* remove common monitor package

* add license

* remove kazoo modify

* remove kazoo modify

* remove kazoo modify

* remove kazoo modify

* remove kazoo modify

* remove kazoo modify

* install.sh remove python kazoo

* add system param whether repeat running

* remove kazoo modify

* BusinessTimeUtils remove whther repeat running inner param
上级 5f4146eb
......@@ -36,6 +36,7 @@ public final class Constants {
*/
public static final String HADOOP_PROPERTIES_PATH = "/common/hadoop/hadoop.properties";
/**
* common properties path
*/
......@@ -1005,4 +1006,5 @@ public final class Constants {
public static final String CLASS = "class";
public static final String RECEIVERS = "receivers";
public static final String RECEIVERS_CC = "receiversCc";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* abstract server monitor and auto restart server
*/
@Component
public abstract class AbstractMonitor implements Monitor {
private static final Logger logger = LoggerFactory.getLogger(AbstractMonitor.class);
@Autowired
private RunConfig runConfig;
/**
* monitor server and restart
*/
@Override
public void monitor(String masterPath,String workerPath,Integer port,String installPath) {
try {
restartServer(masterPath,port,installPath);
restartServer(workerPath,port,installPath);
}catch (Exception e){
logger.error("server start up error",e);
}
}
private void restartServer(String path,Integer port,String installPath) throws Exception{
String type = path.split("/")[2];
String serverName = null;
String nodes = null;
if ("masters".equals(type)){
serverName = "master-server";
nodes = runConfig.getMasters();
}else if ("workers".equals(type)){
serverName = "worker-server";
nodes = runConfig.getWorkers();
}
Map<String, String> activeNodeMap = getActiveNodesByPath(path);
Set<String> needRestartServer = getNeedRestartServer(getRunConfigServer(nodes),
activeNodeMap.keySet());
for (String node : needRestartServer){
// os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(master) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start master-server')
String runCmd = "ssh -p " + port + " " + node + " sh " + installPath + "/bin/dolphinscheduler-daemon.sh start " + serverName;
Runtime.getRuntime().exec(runCmd);
}
}
/**
* get need restart server
* @param deployedNodes deployedNodes
* @param activeNodes activeNodes
* @return need restart server
*/
private Set<String> getNeedRestartServer(Set<String> deployedNodes,Set<String> activeNodes){
if (CollectionUtils.isEmpty(activeNodes)){
return deployedNodes;
}
Set<String> result = new HashSet<>();
result.addAll(deployedNodes);
result.removeAll(activeNodes);
return result;
}
/**
* run config masters/workers
* @return master set/worker set
*/
private Set<String> getRunConfigServer(String nodes){
Set<String> nodeSet = new HashSet();
if (StringUtils.isEmpty(nodes)){
return null;
}
String[] nodeArr = nodes.split(",");
for (String node : nodeArr){
nodeSet.add(node);
}
return nodeSet;
}
/**
* get active nodes by path
* @param path path
* @return active nodes
*/
protected abstract Map<String,String> getActiveNodesByPath(String path);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
/**
* server monitor and auto restart server
*/
public interface Monitor {
/**
* monitor server and restart
*/
void monitor(String masterPath, String workerPath, Integer port, String installPath);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
/**
* monitor server
*/
@ComponentScan("org.apache.dolphinscheduler")
public class MonitorServer implements CommandLineRunner {
private static Integer ARGS_LENGTH = 4;
private static final Logger logger = LoggerFactory.getLogger(MonitorServer.class);
/**
* monitor
*/
@Autowired
private Monitor monitor;
public static void main(String[] args) throws Exception{
new SpringApplicationBuilder(MonitorServer.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
if (args.length != ARGS_LENGTH){
logger.error("Usage: <masterPath> <workerPath> <port> <installPath>");
return;
}
String masterPath = args[0];
String workerPath = args[1];
Integer port = Integer.parseInt(args[2]);
String installPath = args[3];
monitor.monitor(masterPath,workerPath,port,installPath);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
* zookeeper conf
*/
@Component
@PropertySource("classpath:config/run_config.conf")
public class RunConfig {
//zk connect config
@Value("${masters}")
private String masters;
@Value("${workers}")
private String workers;
@Value("${alertServer}")
private String alertServer;
@Value("${apiServers}")
private String apiServers;
@Value("${sshPort}")
private String sshPort;
public String getMasters() {
return masters;
}
public void setMasters(String masters) {
this.masters = masters;
}
public String getWorkers() {
return workers;
}
public void setWorkers(String workers) {
this.workers = workers;
}
public String getAlertServer() {
return alertServer;
}
public void setAlertServer(String alertServer) {
this.alertServer = alertServer;
}
public String getApiServers() {
return apiServers;
}
public void setApiServers(String apiServers) {
this.apiServers = apiServers;
}
public String getSshPort() {
return sshPort;
}
public void setSshPort(String sshPort) {
this.sshPort = sshPort;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.monitor;
import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* zk monitor server impl
*/
@Component
public class ZKMonitorImpl extends AbstractMonitor {
/**
* zookeeper operator
*/
@Autowired
private ZookeeperOperator zookeeperOperator;
/**
* get active nodes map by path
* @param path path
* @return active nodes map
*/
@Override
protected Map<String,String> getActiveNodesByPath(String path) {
Map<String,String> maps = new HashMap<>();
List<String> childrenList = zookeeperOperator.getChildrenKeys(path);
if (childrenList == null){
return maps;
}
for (String child : childrenList){
maps.put(child.split("_")[0],child);
}
return maps;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan("org.apache.dolphinscheduler")
public class RemoveZKNode implements CommandLineRunner {
private static Integer ARGS_LENGTH = 1;
private static final Logger logger = LoggerFactory.getLogger(RemoveZKNode.class);
/**
* zookeeper operator
*/
@Autowired
private ZookeeperOperator zookeeperOperator;
public static void main(String[] args) {
new SpringApplicationBuilder(RemoveZKNode.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
if (args.length != ARGS_LENGTH){
logger.error("Usage: <node>");
return;
}
zookeeperOperator.remove(args[0]);
zookeeperOperator.close();
}
}
......@@ -469,8 +469,8 @@ sh ${workDir}/script/stop-all.sh
# 4,delete zk node
echo "4,delete zk node"
sleep 1
python ${workDir}/script/del-zk-node.py $zkQuorum $zkRoot
sh ${workDir}/script/remove-zk-node.sh $zkRoot
# 5,scp resources
echo "5,scp resources"
......@@ -485,29 +485,4 @@ fi
# 6,startup
echo "6,startup"
sh ${workDir}/script/start-all.sh
# 7,start monitoring self-starting script
monitor_pid=${workDir}/monitor_server.pid
if [ "true" = $monitorServerState ];then
if [ -f $monitor_pid ]; then
TARGET_PID=`cat $monitor_pid`
if kill -0 $TARGET_PID > /dev/null 2>&1; then
echo "monitor server running as process ${TARGET_PID}.Stopping"
kill $TARGET_PID
sleep 5
if kill -0 $TARGET_PID > /dev/null 2>&1; then
echo "monitor server did not stop gracefully after 5 seconds: killing with kill -9"
kill -9 $TARGET_PID
fi
else
echo "no monitor server to stop"
fi
echo "monitor server running as process ${TARGET_PID}.Stopped success"
rm -f $monitor_pid
fi
nohup python -u ${workDir}/script/monitor-server.py $installPath $zkQuorum $zkMasters $zkWorkers > ${workDir}/monitor-server.log 2>&1 &
echo $! > $monitor_pid
echo "start monitor server success as process `cat $monitor_pid`"
fi
\ No newline at end of file
sh ${workDir}/script/start-all.sh
\ No newline at end of file
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
1, yum install pip
yum -y install python-pip
2, pip install kazoo
pip install kazoo
or
3, conda install kazoo
conda install -c conda-forge kazoo
run script and parameter description:
nohup python -u monitor_server.py /data1_1T/dolphinscheduler 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 /dolphinscheduler/masters /dolphinscheduler/workers> monitor_server.log 2>&1 &
the parameters are as follows:
/data1_1T/dolphinscheduler : the value comes from the installPath in install.sh
192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 : the value comes from zkQuorum in install.sh
the value comes from zkWorkers in install.sh
/dolphinscheduler/masters : the value comes from zkMasters in install.sh
/dolphinscheduler/workers : the value comes from zkWorkers in install.sh
'''
import sys
import socket
import os
import sched
import time
from datetime import datetime
from kazoo.client import KazooClient
schedule = sched.scheduler(time.time, time.sleep)
class ZkClient:
def __init__(self):
# hosts configuration zk address cluster
self.zk = KazooClient(hosts=zookeepers)
self.zk.start()
# read configuration files and assemble them into a dictionary
def read_file(self,path):
with open(path, 'r') as f:
dict = {}
for line in f.readlines():
arr = line.strip().split('=')
if (len(arr) == 2):
dict[arr[0]] = arr[1]
return dict
# get the ip address according to hostname
def get_ip_by_hostname(self,hostname):
return socket.gethostbyname(hostname)
# restart server
def restart_server(self,inc):
config_dict = self.read_file(install_path + '/conf/config/run_config.conf')
master_list = config_dict.get('masters').split(',')
print master_list
master_list = list(map(lambda item : self.get_ip_by_hostname(item),master_list))
worker_list = config_dict.get('workers').split(',')
print worker_list
worker_list = list(map(lambda item: self.get_ip_by_hostname(item), worker_list))
ssh_port = config_dict.get("sshPort")
print ssh_port
if (self.zk.exists(masters_zk_path)):
zk_master_list = []
zk_master_nodes = self.zk.get_children(masters_zk_path)
for zk_master_node in zk_master_nodes:
zk_master_list.append(zk_master_node.split('_')[0])
restart_master_list = list(set(master_list) - set(zk_master_list))
if (len(restart_master_list) != 0):
for master in restart_master_list:
print("master " + self.get_ip_by_hostname(master) + " server has down")
os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(master) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start master-server')
if (self.zk.exists(workers_zk_path)):
zk_worker_list = []
zk_worker_nodes = self.zk.get_children(workers_zk_path)
for zk_worker_node in zk_worker_nodes:
zk_worker_list.append(zk_worker_node.split('_')[0])
restart_worker_list = list(set(worker_list) - set(zk_worker_list))
if (len(restart_worker_list) != 0):
for worker in restart_worker_list:
print("worker " + self.get_ip_by_hostname(worker) + " server has down")
os.system('ssh -p ' + ssh_port + ' ' + self.get_ip_by_hostname(worker) + ' sh ' + install_path + '/bin/dolphinscheduler-daemon.sh start worker-server')
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
schedule.enter(inc, 0, self.restart_server, (inc,))
# default parameter 60s
def main(self,inc=60):
# the enter four parameters are: interval event, priority (sequence for simultaneous execution of two events arriving at the same time), function triggered by the call,
# the argument to the trigger function (tuple form)
schedule.enter(0, 0, self.restart_server, (inc,))
schedule.run()
if __name__ == '__main__':
if (len(sys.argv) < 4):
print('please input install_path,zookeepers,masters_zk_path and worker_zk_path')
install_path = sys.argv[1]
zookeepers = sys.argv[2]
masters_zk_path = sys.argv[3]
workers_zk_path = sys.argv[4]
zkClient = ZkClient()
zkClient.main(300)
\ No newline at end of file
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
print_usage(){
printf $"USAGE:$0 masterPath workerPath port installPath\n"
exit 1
}
if [ $# -ne 4 ];then
print_usage
fi
masterPath=$1
workerPath=$2
port=$3
installPath=$4
BIN_DIR=`dirname $0`
BIN_DIR=`cd "$BIN_DIR"; pwd`
DOLPHINSCHEDULER_HOME=$BIN_DIR/..
export JAVA_HOME=$JAVA_HOME
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_OPTS="-server -Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.server.monitor.MonitorServer
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS $masterPath $workerPath $port $installPath"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
......@@ -15,20 +16,33 @@
# limitations under the License.
#
import time
import sys
from kazoo.client import KazooClient
class ZkClient:
def __init__(self):
self.zk = KazooClient(hosts=sys.argv[1])
self.zk.start()
def del_node(self):
self.zk.delete(sys.argv[2], recursive=True)
print('deleted success')
def __del__(self):
self.zk.stop()
if __name__ == '__main__':
zkclient = ZkClient()
zkclient.del_node()
time.sleep(2)
print_usage(){
printf $"USAGE:$0 rootNode\n"
exit 1
}
if [ $# -ne 1 ];then
print_usage
fi
rootNode=$1
BIN_DIR=`dirname $0`
BIN_DIR=`cd "$BIN_DIR"; pwd`
DOLPHINSCHEDULER_HOME=$BIN_DIR/..
export JAVA_HOME=$JAVA_HOME
export DOLPHINSCHEDULER_CONF_DIR=$DOLPHINSCHEDULER_HOME/conf
export DOLPHINSCHEDULER_LIB_JARS=$DOLPHINSCHEDULER_HOME/lib/*
export DOLPHINSCHEDULER_OPTS="-server -Xmx1g -Xms1g -Xss512k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70"
export STOP_TIMEOUT=5
CLASS=org.apache.dolphinscheduler.server.utils.RemoveZKNode
exec_command="$DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS $rootNode"
cd $DOLPHINSCHEDULER_HOME
$JAVA_HOME/bin/java $exec_command
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册