提交 f8ce2be1 编写于 作者: W wangjiawei04

First Commit

Co-authored-by: NWang Guibao <wangguibao@baidu.com>
Co-authored-by: Nsuoych <suoych@163.com>
上级 e126b86a
export HDFS_ADDRESS="hdfs://192.168.48.87:9000"
export HDFS_UGI="root,i"
export START_DATE_HR=20191205/00
export END_DATE_HR=20191205/00
export DATASET_PATH="/train_data"
export SPARSE_DIM="1000001"
#!/bin/bash
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Function definitions
###############################################################################
help()
{
echo "Usage: sh elastic-control.sh [COMMAND] [OPTIONS]"
echo "elastic-control is command line interface with ELASTIC CTR"
echo ""
echo "Commands:"
echo "-r|--config_resource Configure training resource requirments. See bellow"
echo "-a|--apply Apply configurations to start training process"
echo "-l|--log Log the status of training, please make sure you have started the training process"
echo "-c|--config_client Retrieve client binaries to send infer requests and receive results"
echo ""
echo "Options(Used only for --config_resource):"
echo "-u|--cpu CPU cores for each training node (Unused for now)"
echo "-m|--mem Memory for each training node (Unused for now)"
echo "-t|--trainer Number of trainer nodes"
echo "-p|--pserver Number of parameter-server nodes"
echo "-b|--cube Number of cube shards"
echo "-f|--datafile Data file path (Only HDFS supported) (Unused for now)"
echo "-s|--slot_conf Slot config file"
echo ""
echo "Example:"
echo "sh elastic-control.sh -r -u 4 -m 20 -t 2 -p 2 -b 5 -s slot.conf -f data.config"
echo "sh elastic-control.sh -a"
echo "sh elastic-control.sh -c"
echo ""
echo "Notes:"
echo "Slot Config File: Specify which feature ids are used in training. One number per line."
}
die()
{
echo "[FAILED] ${1}"
exit 1
}
ok()
{
echo "[OK] ${1}"
}
check_tools()
{
if [ $# -lt 1 ]; then
echo "Usage: check_tools COMMAND [COMMAND...]"
return
fi
while [ $# -ge 1 ]; do
type $1 &>/dev/null || die "$1 is needed but not found. Aborting..."
shift
done
return 0
}
function check_files()
{
if [ $# -lt 1 ]; then
echo "Usage: check_files COMMAND [COMMAND...]"
return
fi
while [ $# -ge 1 ]; do
[ -f "$1" ] || die "$1 does not exist"
shift
done
return 0
}
function start_fileserver()
{
unset http_proxy
unset https_proxy
kubectl get pod | grep file-server >/dev/null 2>&1
if [ $? -ne 0 ]; then
kubectl apply -f fileserver.yaml
else
echo "delete duplicate file server..."
kubectl delete -f fileserver.yaml
kubectl apply -f fileserver.yaml
fi
}
function install_volcano() {
unset http_proxy
unset https_proxy
kubectl get crds | grep jobs.batch.volcano.sh >/dev/null 2>&1
if [ $? -ne 0 ]; then
echo "volcano not found, now install"
kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yaml
fi
}
function config_client()
{
check_tools wget kubectl
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/data/ctr_prediction/elastic_ctr_client_million.tar.gz
tar zxvf elastic_ctr_client_million.tar.gz
rm elastic_ctr_client_million.tar.gz
for number in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do
SERVING_IP=`kubectl get services | grep 'paddleserving' | awk '{print $4}'`
echo "searching Paddle Serving external IP, wait a moment."
if [ "${SERVING_IP}" == "<pending>" ]; then
sleep 10
else
break
fi
done
SERVING_IP=`kubectl get services | grep 'paddleserving' | awk '{print $4}'`
SERVING_PORT=`kubectl get services | grep 'paddleserving' | awk '{print $5}' | awk -F':' '{print $1}'`
SERVING_ADDR="$SERVING_IP:$SERVING_PORT"
sed -e "s#<$ SERVING_LIST $>#$SERVING_ADDR#g" client/template/predictors.prototxt.template > client/conf/predictors.prototxt
FILESERVER_IP=`kubectl get services | grep 'file-server' | awk '{print $4}'`
FILESERVER_PORT=`kubectl get services | grep 'file-server' | awk '{print $5}' | awk -F':' '{print $1}'`
wget http://$FILESERVER_IP:$FILESERVER_PORT/slot.conf -O client/conf/slot.conf
cp api/lib/*.so client/bin/
echo "Done."
echo "============================================"
echo ""
echo "Try ELASTIC CTR:"
echo "1. cd client"
echo "2. (python) python bin/elastic_ctr.py $SERVING_IP $SERVING_PORT conf/slot.conf data/ctr_prediction/data.txt"
echo "3. (C++ native) bin/elastic_ctr_demo --test_file data/ctr_prediction/data.txt"
return 0
}
function generate_cube_yaml()
{
if [ $# != 1 ]; then
echo "Invalid argument to function generate_cube_yaml"
return -1
fi
if [ "$1" -lt 1 ]; then
echo "CUBE_SHARD_NUM must not be less than 1"
return -1
fi
CNT=$(($1-1))
CUBE_SHARD_NUM=$1
for i in `seq 0 $CNT`; do
echo "---"
echo "apiVersion: v1"
echo "kind: Pod"
echo "metadata:"
echo " name: cube-$i"
echo " labels:"
echo " app: cube-$i"
echo "spec:"
echo " containers:"
echo " - name: cube-$i"
echo " image: hub.baidubce.com/ctr/cube:v1"
echo " workingDir: /cube"
echo " command: ['/bin/bash']"
echo " args: ['start.sh']"
echo " env:"
echo " - name: CUBE_SHARD_NUM"
echo " value: \"$CUBE_SHARD_NUM\""
echo " ports:"
echo " - containerPort: 8001"
echo " name: cube-agent"
echo " - containerPort: 8027"
echo " name: cube-server"
echo "---"
echo "kind: Service"
echo "apiVersion: v1"
echo "metadata:"
echo " name: cube-$i"
echo "spec:"
echo " ports:"
echo " - name: agent"
echo " port: 8001"
echo " protocol: TCP"
echo " - name: server"
echo " port: 8027"
echo " protocol: TCP"
echo " selector:"
echo " app: cube-$i"
done > cube.yaml
{
echo "apiVersion: v1"
echo "kind: Pod"
echo "metadata:"
echo " name: cube-transfer"
echo " labels:"
echo " app: cube-transfer"
echo "spec:"
echo " containers:"
echo " - name: cube-transfer"
echo " image: hub.baidubce.com/ctr/cube-transfer:v1"
echo " workingDir: /"
echo " env:"
echo " - name: POD_IP"
echo " valueFrom:"
echo " fieldRef:"
echo " apiVersion: v1"
echo " fieldPath: status.podIP"
echo " - name: CUBE_SHARD_NUM"
echo " value: \"$CUBE_SHARD_NUM\""
echo " command: ['bash']"
echo " args: ['nonstop.sh']"
echo " ports:"
echo " - containerPort: 8099"
echo " name: cube-transfer"
echo " - containerPort: 8098"
echo " name: cube-http"
} > transfer.yaml
echo "cube.yaml written to ./cube.yaml"
echo "transfer.yaml written to ./transfer.yaml"
return 0
}
function generate_fileserver_yaml()
{
check_tools sed
check_files fileserver.yaml.template
if [ $# -ne 2 ]; then
echo "Invalid argument to function generate_fileserver_yaml"
return -1
else
hdfs_address=$1
hdfs_ugi=$2
sed -e "s#<$ HDFS_ADDRESS $>#$hdfs_address#g" \
-e "s#<$ HDFS_UGI $>#$hdfs_ugi#g" \
fileserver.yaml.template > fileserver.yaml
echo "File server yaml written to fileserver.yaml"
fi
return 0
}
function generate_yaml()
{
check_tools sed
check_files fleet-ctr.yaml.template
if [ $# -ne 11 ]; then
echo "Invalid argument to function generate_yaml"
return -1
else
pserver_num=$1
total_trainer_num=$2
slave_trainer_num=$((total_trainer_num))
let total_pod_num=${total_trainer_num}+${pserver_num}
cpu_num=$3
mem=$4
data_path=$5
hdfs_address=$6
hdfs_ugi=$7
start_date_hr=$8
end_date_hr=$9
sparse_dim=${10}
dataset_path=${11}
sed -e "s#<$ PSERVER_NUM $>#$pserver_num#g" \
-e "s#<$ TRAINER_NUM $>#$total_trainer_num#g" \
-e "s#<$ SLAVE_TRAINER_NUM $>#$slave_trainer_num#g" \
-e "s#<$ CPU_NUM $>#$cpu_num#g" \
-e "s#<$ MEMORY $>#$mem#g" \
-e "s#<$ DATASET_PATH $>#$dataset_path#g" \
-e "s#<$ SPARSE_DIM $>#$sparse_dim#g" \
-e "s#<$ HDFS_ADDRESS $>#$hdfs_address#g" \
-e "s#<$ HDFS_UGI $>#$hdfs_ugi#g" \
-e "s#<$ START_DATE_HR $>#$start_date_hr#g" \
-e "s#<$ END_DATE_HR $>#$end_date_hr#g" \
-e "s#<$ TOTAL_POD_NUM $>#$total_pod_num#g" \
fleet-ctr.yaml.template > fleet-ctr.yaml
echo "Main yaml written to fleet-ctr.yaml"
fi
return 0
}
function upload_slot_conf()
{
check_tools kubectl curl
if [ $# -ne 1 ]; then
die "upload_slot_conf: Slot conf file not specified"
fi
check_files $1
echo "start file-server pod"
start_fileserver
for number in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do
FILESERVER_IP=`kubectl get services | grep 'file-server' | awk '{print $4}'`
echo "searching file-server external IP, wait a moment."
if [ "${FILESERVER_IP}" == "<pending>" ]; then
sleep 10
else
break
fi
done
if [ "${FILESERVER_IP}" == "<pending>" ]; then
echo "error in K8S cluster, cannot continue. Aborted"
return 1
fi
FILESERVER_IP=`kubectl get services | grep 'file-server' | awk '{print $4}'`
FILESERVER_PORT=`kubectl get services | grep 'file-server' | awk '{print $5}' | awk -F':' '{print $2}' | awk -F',' '{print $2}'`
if [ "${file##*.}"x = "txt"x ];
then
echo "slot file suffix must be '.txt'"
fi
echo "curl --upload-file $1 $FILESERVER_IP:$FILESERVER_PORT"
curl --upload-file $1 $FILESERVER_IP:$FILESERVER_PORT
if [ $? == 0 ]; then
echo "File $1 uploaded to $FILESERVER_IP:$FILESERVER_PORT/slot.conf"
fi
return 0
}
function config_resource()
{
echo "CPU=$CPU MEM=$MEM CUBE=$CUBE TRAINER=$TRAINER PSERVER=$PSERVER "\
"CUBE=$CUBE DATA_PATH=$DATA_PATH SLOT_CONF=$SLOT_CONF VERBOSE=$VERBOSE "\
"HDFS_ADDRESS=$HDFS_ADDRESS HDFS_UGI=$HDFS_UGI START_DATE_HR=$START_DATE_HR END_DATE_HR=$END_DATE_HR "\
"SPARSE_DIM=$SPARSE_DIM DATASET_PATH=$DATASET_PATH "
generate_cube_yaml $CUBE || die "config_resource: generate_cube_yaml failed"
generate_fileserver_yaml $HDFS_ADDRESS $HDFS_UGI || die "config_resource: generate_fileserver_yaml failed"
generate_yaml $PSERVER $TRAINER $CPU $MEM $DATA_PATH $HDFS_ADDRESS $HDFS_UGI $START_DATE_HR $END_DATE_HR $SPARSE_DIM $DATASET_PATH || die "config_resource: generate_yaml failed"
upload_slot_conf $SLOT_CONF || die "config_resource: upload_slot_conf failed"
return 0
}
function log()
{
echo "Trainer 0 Log:"
kubectl logs fleet-ctr-demo-trainer-0 | grep __main__ > train.log
if [ -f train.log ]; then
tail -n 20 train.log
else
echo "Trainer Log Has not been generated"
fi
echo "\nFile Server Log:"
kubectl logs file-server | grep __main__ > file-server.log
if [ -f file-server.log ]; then
tail -n 20 file-server.log
else
echo "File Server Log Has not been generated"
fi
echo "\nCube Transfer Log:"
kubectl logs cube-transfer | grep "all reload ok" > cube-transfer.log
if [ -f cube-transfer.log ]; then
tail -n 20 cube-transfer.log
else
echo "Cube Transfer Log Has not been generated"
fi
echo "\nPadddle Serving Log:"
kubectl logs paddleserving | grep INFO > paddleserving.log
}
datafile_config()
{
source $DATA_CONF_PATH
}
function apply()
{
check_tools kubectl
install_volcano
kubectl get pod | grep cube | awk {'print $1'} | xargs kubectl delete pod >/dev/null 2>&1
kubectl get pod | grep paddleserving | awk {'print $1'} | xargs kubectl delete pod >/dev/null 2>&1
kubectl apply -f cube.yaml
kubectl apply -f transfer.yaml
kubectl apply -f pdserving.yaml
kubectl get jobs.batch.volcano.sh | grep fleet-ctr-demo
if [ $? == 0 ]; then
kubectl delete jobs.batch.volcano.sh fleet-ctr-demo
fi
kubectl apply -f fleet-ctr.yaml
return
}
###############################################################################
# Main logic begin
###############################################################################
CMD=""
CPU=4
MEM=4
CUBE=2
TRAINER=2
PSERVER=2
DATA_PATH="/app"
SLOT_CONF="./slot.conf"
VERBOSE=0
DATA_CONF_PATH="data.config"
source $DATA_CONF_PATH
# Parse arguments
TEMP=`getopt -n elastic-control -o crahu:m:t:p:b:f:s:v:l --longoption config_client,config_resource,apply,help,cpu:,mem:,trainer:,pserver:,cube:,datafile:,slot_conf:,verbose,log -- "$@"`
# Die if they fat finger arguments, this program will be run as root
[ $? = 0 ] || die "Error parsing arguments. Try $0 --help"
eval set -- "$TEMP"
while true; do
case $1 in
-c|--config_client)
CMD="config_client"; shift; continue
;;
-r|--config_resource)
CMD="config_resource"; shift; continue
;;
-a|--apply)
CMD="apply"; shift; continue
;;
-h|--help)
help
exit 0
;;
-l|--log)
log; shift;
exit 0
;;
-u|--cpu)
CPU="$2"; shift; shift; continue
;;
-m|--mem)
MEM="$2"; shift; shift; continue
;;
-t|--trainer)
TRAINER="$2"; shift; shift; continue
;;
-p|--pserver)
PSERVER="$2"; shift; shift; continue
;;
-b|--cube)
CUBE="$2"; shift; shift; continue
;;
-f|--datafile)
DATA_CONF_PATH="$2"; datafile_config ; shift; shift; continue
;;
-s|--slot_conf)
SLOT_CONF="$2"; shift; shift; continue
;;
-v|--verbose)
VERBOSE=1; shift; continue
;;
--)
# no more arguments to parse
break
;;
*)
printf "Unknown option %s\n" "$1"
exit 1
;;
esac
done
case $CMD in
config_resource)
config_resource
;;
config_client)
config_client
;;
apply)
apply
;;
status)
status
;;
*)
help
;;
esac
apiVersion: v1
kind: Pod
metadata:
name: file-server
labels:
app: file-server
spec:
containers:
- name: file-server
image: hub.baidubce.com/ctr/file-server:hdfs7
ports:
- containerPort: 8080
command: ['bash']
args: ['run.sh']
env:
- name: JAVA_HOME
value: /usr/local/jdk1.8.0_231
- name: HADOOP_HOME
value: /usr/local/hadoop-2.8.5
- name: HDFS_ADDRESS
value: "<$ HDFS_ADDRESS $>"
- name: HDFS_UGI
value: "<$ HDFS_UGI $>"
- name: PATH
value: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8.0_231/bin:/usr/local/hadoop-2.8.5/bin:/Python-3.7.0:/node-v12.13.1-linux-x64/bin
---
kind: Service
apiVersion: v1
metadata:
name: file-server
spec:
type: LoadBalancer
ports:
- name: file-server
port: 8080
targetPort: 8080
- name: upload
port: 9000
targetPort: 9000
selector:
app: file-server
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: fleet-ctr-demo
spec:
minAvailable: <$ TOTAL_POD_NUM $>
schedulerName: volcano
policies:
- event: PodEvicted
action: RestartJob
- event: PodFailed
action: RestartJob
tasks:
- replicas: <$ PSERVER_NUM $>
name: pserver
template:
metadata:
labels:
paddle-job-pserver: fluid-ctr
spec:
imagePullSecrets:
- name: default-secret
containers:
- image: hub.baidubce.com/ctr/fleet-ctr:83
command:
- paddle_k8s
- start_fluid
imagePullPolicy: IfNotPresent
name: preserver
resources:
limits:
cpu: 10
memory: 30Gi
ephemeral-storage: 10Gi
requests:
cpu: 1
memory: 100M
ephemeral-storage: 1Gi
env:
- name: GLOG_v
value: "0"
- name: GLOG_logtostderr
value: "1"
- name: TOPOLOGY
value: ""
- name: TRAINER_PACKAGE
value: /workspace
- name: PADDLE_INIT_NICS
value: eth2
- name: NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: PADDLE_CURRENT_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: PADDLE_JOB_NAME
value: fluid-ctr
- name: PADDLE_IS_LOCAL
value: "0"
- name: PADDLE_TRAINERS_NUM
value: "<$ TRAINER_NUM $>"
- name: PADDLE_PSERVERS_NUM
value: "<$ PSERVER_NUM $>"
- name: FLAGS_rpc_deadline
value: "36000000"
- name: ENTRY
value: cd workspace && python3 train_with_mlflow.py slot.conf
- name: PADDLE_PORT
value: "30240"
- name: HDFS_ADDRESS
value: "<$ HDFS_ADDRESS $>"
- name: HDFS_UGI
value: "<$ HDFS_UGI $>"
- name: START_DATE_HR
value: <$ START_DATE_HR $>
- name: END_DATE_HR
value: <$ END_DATE_HR $>
- name: DATASET_PATH
value: "<$ DATASET_PATH $>"
- name: SPARSE_DIM
value: "<$ SPARSE_DIM $>"
- name: PADDLE_TRAINING_ROLE
value: PSERVER
- name: TRAINING_ROLE
value: PSERVER
restartPolicy: OnFailure
- replicas: <$ TRAINER_NUM $>
policies:
- event: TaskCompleted
action: CompleteJob
name: trainer
template:
metadata:
labels:
paddle-job: fluid-ctr
app: mlflow
spec:
imagePullSecrets:
- name: default-secret
containers:
- image: hub.baidubce.com/ctr/fleet-ctr:83
command:
- paddle_k8s
- start_fluid
imagePullPolicy: IfNotPresent
name: trainer
resources:
limits:
cpu: 10
memory: 30Gi
ephemeral-storage: 10Gi
requests:
cpu: 1
memory: 100M
ephemeral-storage: 10Gi
env:
- name: GLOG_v
value: "0"
- name: GLOG_logtostderr
value: "1"
- name: TRAINER_PACKAGE
value: /workspace
- name: PADDLE_INIT_NICS
value: eth2
- name: CPU_NUM
value: "2"
- name: NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: PADDLE_CURRENT_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: PADDLE_JOB_NAME
value: fluid-ctr
- name: PADDLE_IS_LOCAL
value: "0"
- name: FLAGS_rpc_deadline
value: "36000000"
- name: PADDLE_PSERVERS_NUM
value: "2"
- name: PADDLE_TRAINERS_NUM
value: "2"
- name: PADDLE_PORT
value: "30240"
- name: PADDLE_TRAINING_ROLE
value: TRAINER
- name: TRAINING_ROLE
value: TRAINER
- name: HDFS_ADDRESS
value: "<$ HDFS_ADDRESS $>"
- name: HDFS_UGI
value: "<$ HDFS_UGI $>"
- name: START_DATE_HR
value: <$ START_DATE_HR $>
- name: END_DATE_HR
value: <$ END_DATE_HR $>
- name: DATASET_PATH
value: "<$ DATASET_PATH $>"
- name: SPARSE_DIM
value: "<$ SPARSE_DIM $>"
- name: JAVA_HOME
value: /usr/local/jdk1.8.0_231
- name: HADOOP_HOME
value: /usr/local/hadoop-2.8.5
- name: PATH
value: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8.0_231/bin:/usr/local/hadoop-2.8.5/bin:/Python-3.7.0
- name: ENTRY
value: cd workspace && (bash mlflow_run.sh &) && python3 train_with_mlflow.py slot.conf
restartPolicy: OnFailure
---
kind: Service
apiVersion: v1
metadata:
name: mlflow
spec:
type: LoadBalancer
ports:
- name: mlflow
port: 8111
targetPort: 8111
selector:
app: mlflow
import time
import os
start_service_flag = True
while True:
os.system("kubectl cp fleet-ctr-demo-trainer-0:workspace/mlruns ./mlruns")
if os.path.exists("./mlruns") and start_service_flag:
os.system("mlflow server --default-artifact-root ./mlruns/0 --host 0.0.0.0 --port 8111 &")
start_service_flag = False
time.sleep(30)
apiVersion: v1
kind: Pod
metadata:
name: paddleserving
labels:
app: paddleserving
spec:
containers:
- name: paddleserving
image: hub.baidubce.com/ctr/paddleserving:test_elastic_ctr_1
#image: hub.baidubce.com/ctr/paddleserving:latest
workingDir: /serving
command: ['/bin/bash']
args: ['run.sh']
#command: ['sleep']
#args: ['1000000']
ports:
- containerPort: 8010
name: serving
---
apiVersion: v1
kind: Service
metadata:
name: paddleserving
spec:
type: LoadBalancer
ports:
- name: paddleserving
port: 8010
targetPort: 8010
selector:
app: paddleserving
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册