未验证 提交 fca1c52d 编写于 作者: J Jinhua Liang 提交者: GitHub

Engine clean (#92)

* add cluster running

* add k8s/paddle cloud

* fix local cluster without gpu

* define engine to train/infer/local_cluster_train/cluster_train

* fix gpu cluster wait

* fix bug for 2ps 2tr
Co-authored-by: Ntangwei <tangwei12@baidu.com>
上级 8270dd0c
......@@ -29,8 +29,13 @@ class ClusterEngine(Engine):
abs_dir = os.path.dirname(os.path.abspath(__file__))
backend = envs.get_runtime_environ("engine_backend")
if backend == "PaddleCloud":
if not backend:
backend = ""
backend = backend.upper()
if backend == "PADDLECLOUD":
self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh")
elif backend == "KUBERNETES":
self.submit_script = os.path.join(abs_dir, "k8s/cluster.sh")
else:
raise ValueError("{} can not be supported now".format(backend))
......@@ -48,6 +53,14 @@ class ClusterEngine(Engine):
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
proc.wait()
@staticmethod
def workspace_replace():
workspace = envs.get_runtime_environ("engine_workspace")
for k, v in os.environ.items():
v = v.replace("{workspace}", workspace)
os.environ[k] = str(v)
def run(self):
role = envs.get_runtime_environ("engine_role")
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash
###################################################
# Usage: submit.sh
# Description: run k8s submit client implement
###################################################
# ---------------------------------------------------------------------------- #
# variable define #
# ---------------------------------------------------------------------------- #
#-----------------------------------------------------------------------------------------------------------------
#fun : create ConfigMap for k8s pod
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function create_config_map() {
echo "Create configmap"
echo "Delete exist configmap which named 'modelconfig'"
kubectl delete configmap modelconfig
kubectl create configmap modelconfig --from-file=${abs_dir}/k8s/set_k8s_env.sh,${paddlerec_model_config}
}
#-----------------------------------------------------------------------------------------------------------------
#fun : create yaml config for k8s job
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function create_k8s_yaml() {
echo "Create k8s.yaml"
if [ -f ${PWD}/k8s.yaml ]; then
echo "Delete exist k8s.yaml at ${PWD}"
rm ${PWD}/k8s.yaml
fi
let total_pod_num=${engine_submit_trainer_num}+${engine_submit_server_num}
echo "--K8S ENV-- Job name: ${engine_job_name}"
echo "--K8S ENV-- Total pod nums: ${total_pod_num}"
echo "--K8S ENV-- Trainer nums: ${engine_submit_trainer_num}"
echo "--K8S ENV-- Pserver nums: ${engine_submit_server_num}"
echo "--K8S ENV-- Docker image: ${engine_submit_docker_image}"
echo "--K8S ENV-- Threads(cpu_num) ${CPU_NUM}"
echo "--K8S ENV-- Memory ${engine_submit_memory}"
echo "--K8S ENV-- Storage ${engine_submit_storage}"
echo "--K8S ENV-- Log level ${engine_submit_log_level}"
sed -e "s#<$ JOB_NAME $>#$engine_job_name#g" \
-e "s#<$ TOTAL_POD_NUM $>#$total_pod_num#g" \
-e "s#<$ TRAINER_NUM $>#$engine_submit_trainer_num#g" \
-e "s#<$ PSERVER_NUM $>#$engine_submit_server_num#g" \
-e "s#<$ IMAGE $>#$engine_submit_docker_image#g" \
-e "s#<$ CPU_NUM $>#$CPU_NUM#g" \
-e "s#<$ MEMORY $>#$engine_submit_memory#g" \
-e "s#<$ STORAGE $>#$engine_submit_storage#g" \
-e "s#<$ GLOG_V $>#$engine_submit_log_level#g" \
${abs_dir}/k8s/k8s.yaml.template >${PWD}/k8s.yaml
}
#-----------------------------------------------------------------------------------------------------------------
#fun : submit to k8s cluster
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function submit() {
echo "Submit"
echo "Delete exist job which named ${engine_job_name}"
kubectl delete jobs.batch.volcano.sh $engine_job_name
kubectl apply -f ${PWD}/k8s.yaml
}
function main() {
create_config_map
create_k8s_yaml
submit
}
main
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: <$ JOB_NAME $>
spec:
minAvailable: <$ TOTAL_POD_NUM $>
schedulerName: volcano
policies:
- event: PodEvicted
action: RestartJob
- event: PodFailed
action: RestartJob
tasks:
- replicas: <$ PSERVER_NUM $>
name: pserver
template:
metadata:
labels:
paddle-job-pserver: paddle-rec
spec:
imagePullSecrets:
- name: default-secret
containers:
- image: <$ IMAGE $>
command:
- '/bin/bash'
args:
- "-c"
- |
set -ex
sh /usr/paddlerec/set_k8s_env.sh start_fluid
imagePullPolicy: Always
volumeMounts:
- name: model-config
mountPath: "/usr/paddlerec"
name: pserver
resources:
limits:
cpu: <$ CPU_NUM $>
memory: <$ MEMORY $>
ephemeral-storage: <$ STORAGE $>
requests:
cpu: 1
memory: 1Gi
ephemeral-storage: 1Gi
env:
- name: GLOG_v
value: "<$ GLOG_V $>"
- name: GLOG_logtostderr
value: "1"
- name: TOPOLOGY
value: ""
- name: TRAINER_PACKAGE
value: /root/paddlejob
- name: PADDLE_INIT_NICS
value: eth2
- name: NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: PADDLE_CURRENT_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: PADDLE_JOB_NAME
value: paddle-rec
- name: PADDLE_IS_LOCAL
value: "0"
- name: PADDLE_TRAINERS_NUM
value: "<$ TRAINER_NUM $>"
- name: PADDLE_PSERVERS_NUM
value: "<$ PSERVER_NUM $>"
- name: FLAGS_rpc_deadline
value: "100000"
- name: ENTRY
value: python -m paddlerec.run -m /usr/paddlerec/config.yaml -r WORKER
- name: PADDLE_PORT
value: "30240"
- name: PADDLE_TRAINING_ROLE
value: PSERVER
- name: TRAINING_ROLE
value: PSERVER
volumes:
- name: model-config
configMap:
name: modelconfig
defaultMode: 0777
restartPolicy: OnFailure
- replicas: <$ TRAINER_NUM $>
policies:
- event: TaskCompleted
action: CompleteJob
name: trainer
template:
metadata:
labels:
paddle-job: paddle-rec
spec:
imagePullSecrets:
- name: default-secret
containers:
- image: <$ IMAGE $>
command:
- '/bin/bash'
args:
- "-c"
- |
set -ex
sh /usr/paddlerec/set_k8s_env.sh start_fluid
imagePullPolicy: Always
volumeMounts:
- name: model-config
mountPath: "/usr/paddlerec"
name: trainer
resources:
limits:
cpu: <$ CPU_NUM $>
memory: <$ MEMORY $>
ephemeral-storage: <$ STORAGE $>
requests:
cpu: 1
memory: 1Gi
ephemeral-storage: 1Gi
env:
- name: GLOG_v
value: "<$ GLOG_V $>"
- name: GLOG_logtostderr
value: "1"
- name: TRAINER_PACKAGE
value: /root/paddlejob
- name: PADDLE_INIT_NICS
value: eth2
- name: NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: PADDLE_CURRENT_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: PADDLE_JOB_NAME
value: paddle-rec
- name: PADDLE_IS_LOCAL
value: "0"
- name: FLAGS_rpc_deadline
value: "3600"
- name: PADDLE_PORT
value: "30240"
- name: PADDLE_PSERVERS_NUM
value: "<$ PSERVER_NUM $>"
- name: PADDLE_TRAINERS_NUM
value: "<$ TRAINER_NUM $>"
- name: PADDLE_TRAINING_ROLE
value: TRAINER
- name: TRAINING_ROLE
value: TRAINER
- name: ENTRY
value: python -m paddlerec.run -m /usr/paddlerec/config.yaml -r WORKER
volumes:
- name: model-config
configMap:
name: modelconfig
defaultMode: 0777
restartPolicy: OnFailure
#!/bin/bash
set -x
check_failed_cnt() {
max_failed=$1
failed_count=$(python -m paddlerec.tools.k8s_tools count_pods_by_phase paddle-job=${PADDLE_JOB_NAME} Failed)
if [ $failed_count -gt $max_failed ]; then
stdbuf -oL echo "Failed trainer count beyond the threadhold: "$max_failed
echo "Failed trainer count beyond the threshold: " $max_failed >/dev/termination-log
exit 0
fi
}
check_trainer_ret() {
ret=$1
stdbuf -oL echo "job returned $ret...setting pod return message..."
stdbuf -oL echo "==============================="
if [ $ret -eq 136 ]; then
echo "Error Arithmetic Operation(Floating Point Exception)" >/dev/termination-log
elif [ $ret -eq 139 ]; then
echo "Segmentation Fault" >/dev/termination-log
elif [ $ret -eq 1 ]; then
echo "General Error" >/dev/termination-log
elif [ $ret -eq 134 ]; then
echo "Program Abort" >/dev/termination-log
fi
stdbuf -oL echo "termination log wroted..."
exit $ret
}
start_fluid_process() {
pserver_label="paddle-job-pserver=${PADDLE_JOB_NAME}"
trainer_label="paddle-job=${PADDLE_JOB_NAME}"
hostname=${HOSTNAME}
task_index=""
if [ "${PADDLE_TRAINING_ROLE}" == "TRAINER" ] || [ "${PADDLE_TRAINING_ROLE}" == "PSERVER" ]; then
stdbuf -oL python -m paddlerec.tools.k8s_tools wait_pods_running ${pserver_label} ${PADDLE_PSERVERS_NUM}
fi
export PADDLE_PSERVERS_IP_PORT_LIST=$(python -m paddlerec.tools.k8s_tools fetch_endpoints ${pserver_label} ${PADDLE_PORT})
export PADDLE_TRAINER_IPS=$(python -m paddlerec.tools.k8s_tools fetch_ips ${trainer_label})
if [ "${PADDLE_TRAINING_ROLE}" == "TRAINER" ]; then
check_failed_cnt 1
task_index=$(python -m paddlerec.tools.k8s_tools fetch_id ${trainer_label})
else
task_index=$(python -m paddlerec.tools.k8s_tools fetch_id ${pserver_label})
fi
export PADDLE_TRAINER_ID=${task_index}
export PADDLE_PSERVER_ID=${task_index}
stdbuf -oL sh -c "${ENTRY}"
check_trainer_ret $?
}
usage() {
echo "usage: paddle_k8s [<args>]:"
echo " start_fluid Start paddle fluid distributed training, set env"
}
case "$1" in
start_fluid)
start_fluid_process
;;
--help)
usage
;;
*)
usage
;;
esac
......@@ -31,7 +31,7 @@ class LocalClusterEngine(Engine):
server_num = self.envs["server_num"]
ports = [self.envs["start_port"]]
logs_dir = self.envs["log_dir"]
selected_gpus = self.envs["selected_gpus"].split(",")
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env["CLUSTER_INSTANCE"] = "1"
......@@ -97,25 +97,8 @@ class LocalClusterEngine(Engine):
stderr=fn,
cwd=os.getcwd())
procs.append(proc)
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < server_num:
continue
procs[i].wait()
if len(log_fns) > 0:
log_fns[i].close()
for i in range(server_num):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
print(
"all workers already completed, you can view logs under the `{}` directory".
format(logs_dir),
file=sys.stderr)
elif fleet_mode.upper() == "COLLECTIVE":
selected_gpus = self.envs["selected_gpus"].split(",")
selected_gpus_num = len(selected_gpus)
for i in range(selected_gpus_num - 1):
......@@ -150,5 +133,22 @@ class LocalClusterEngine(Engine):
cwd=os.getcwd())
procs.append(proc)
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < server_num:
continue
procs[i].wait()
if len(log_fns) > 0:
log_fns[i].close()
for i in range(server_num):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
print(
"all workers already completed, you can view logs under the `{}` directory".
format(logs_dir),
file=sys.stderr)
def run(self):
self.start_procs()
......@@ -134,6 +134,10 @@ def os_path_adapter(value):
def workspace_adapter(value):
workspace = global_envs.get("workspace")
return workspace_adapter_by_specific(value, workspace)
def workspace_adapter_by_specific(value, workspace):
workspace = paddlerec_adapter(workspace)
value = value.replace("{workspace}", workspace)
return value
......
......@@ -117,7 +117,8 @@ def register():
in_value_handler)
validations["train.epochs"] = ValueFormat("int", 1, ge_value_handler)
validations["train.engine"] = ValueFormat(
"str", ["single", "local_cluster", "cluster"], in_value_handler)
"str", ["train", "infer", "local_cluster_train", "cluster_train"],
in_value_handler)
requires = ["workspace", "dataset", "mode", "runner", "phase"]
return validations, requires
......
......@@ -56,9 +56,9 @@ engine = which_engine(args)
engine.run()
```
我们以`single engine`为例,概览engine的行为:
我们以`单机 engine`为例,概览engine的行为:
```python
def single_train_engine(args):
def train_engine(args):
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
trainer_class = run_extras.get(
......
......@@ -136,7 +136,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
device: cpu
epochs: 10
save_checkpoint_interval: 2
......
......@@ -77,7 +77,7 @@ mode: single_cpu_train # 执行名为 single_cpu_train 的运行器
runner:
- name: single_cpu_train # 定义 runner 名为 single_cpu_train
class: train # 执行单机训练,亦可为 single_train
class: train # 执行单机训练
device: cpu # 执行在 cpu 上
epochs: 10 # 训练轮数
......@@ -120,7 +120,7 @@ mode: single_gpu_train # 执行名为 single_gpu_train 的运行器
runner:
- name: single_gpu_train # 定义 runner 名为 single_gpu_train
class: train # 执行单机训练,亦可为 single_train
class: train # 执行单机训练
device: gpu # 执行在 gpu 上
selected_gpus: "0" # 默认选择在id=0的卡上执行训练
epochs: 10 # 训练轮数
......@@ -135,7 +135,7 @@ mode: single_multi_gpu_train # 执行名为 single_multi_gpu_train 的运行器
runner:
- name: single_multi_gpu_train # 定义 runner 名为 single_multi_gpu_train
class: train # 执行单机训练,亦可为 single_train
class: train # 执行单机训练
device: gpu # 执行在 gpu 上
selected_gpus: "0,1,2,3" # 选择多卡执行训练
epochs: 10 # 训练轮数
......@@ -149,7 +149,7 @@ mode: local_cluster_cpu_train # 执行名为 local_cluster_cpu_train 的运行
runner:
- name: local_cluster_cpu_train # 定义 runner 名为 runner_train
class: local_cluster # 执行本地模拟分布式——参数服务器训练
class: local_cluster_train # 执行本地模拟分布式——参数服务器训练
device: cpu # 执行在 cpu 上(paddle后续版本会支持PS-GPU)
worker_num: 1 # (可选)worker进程数量,默认1
server_num: 1 # (可选)server进程数量,默认1
......
......@@ -137,7 +137,7 @@ class TerminalBase(object):
```yaml
runner:
- name: train_runner
class: single_train
class: train
epochs: 2
device: cpu
instance_class_path: "{workspace}/your_instance.py"
......@@ -174,7 +174,7 @@ class Startup(StartupBase):
```yaml
runner:
- name: runner1
class: single_train
class: train
startup_class_path: "{workspace}/tdm_startup.py"
epochs: 10
device: cpu
......
......@@ -15,7 +15,7 @@
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
| :---------------------------: | :----------: | :-------------------------------------------: | :------: | :------------------------------------------------------------------: |
| name | string | 任意 | 是 | 指定runner名称 |
| class | string | train(默认) / infer / local_cluster / cluster | 是 | 指定运行runner的类别(单机/分布式, 训练/预测) |
| class | string | train(默认) / infer / local_cluster_train / cluster_train | 是 | 指定运行runner的类别(单机/分布式, 训练/预测) |
| device | string | cpu(默认) / gpu | 否 | 程序执行设备 |
| fleet_mode | string | ps(默认) / pslib / collective | 否 | 分布式运行模式 |
| selected_gpus | string | "0"(默认) | 否 | 程序运行GPU卡号,若以"0,1"的方式指定多卡,则会默认启用collective模式 |
......
......@@ -31,7 +31,7 @@ mode: runner1
runner:
- name: runner1
class: single_train
class: train
epochs: 10
device: cpu
save_checkpoint_interval: 2
......
......@@ -38,7 +38,7 @@ mode: runner1
runner:
- name: runner1
class: single_train
class: train
epochs: 10
device: cpu
save_checkpoint_interval: 2
......
......@@ -55,7 +55,7 @@ mode: runner_train
#mode: runner_infer
runner:
- name: runner_train
class: single_train
class: train
save_checkpoint_interval: 1 # save model interval of epochs
save_inference_interval: 1 # save inference
save_checkpoint_path: "increment" # save checkpoint path
......
......@@ -55,7 +55,7 @@ mode: runner_train
#mode: runner_infer
runner:
- name: runner_train
class: single_train
class: train
save_checkpoint_interval: 1 # save model interval of epochs
save_inference_interval: 1 # save inference
save_checkpoint_path: "increment" # save checkpoint path
......
......@@ -42,7 +42,7 @@ mode: train_runner
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: train_runner
class: single_train
class: train
# num of epochs
epochs: 4
# device to run training or infer
......
......@@ -49,7 +49,7 @@ mode: train_runner
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: train_runner
class: single_train
class: train
# num of epochs
epochs: 2
# device to run training or infer
......
......@@ -43,7 +43,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
device: cpu
epochs: 3
save_checkpoint_interval: 2
......
......@@ -42,7 +42,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
device: cpu
epochs: 3
save_checkpoint_interval: 2
......
......@@ -49,7 +49,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 2
device: cpu
init_model_path: ""
......
......@@ -50,7 +50,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -48,7 +48,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -48,7 +48,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 2
device: cpu
init_model_path: ""
......
......@@ -46,7 +46,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -47,7 +47,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -52,7 +52,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -47,7 +47,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -47,7 +47,7 @@ mode: train_FM_runner #for FM phase: train_FM_runner for dnn phase: train_DNN_ru
runner:
- name: train_FM_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......@@ -57,7 +57,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: train_DNN_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -46,7 +46,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 2
device: cpu
init_model_path: ""
......
......@@ -53,7 +53,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -50,7 +50,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -45,7 +45,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -46,7 +46,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
epochs: 1
device: cpu
init_model_path: ""
......
......@@ -50,7 +50,7 @@ mode: train_runner
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: train_runner
class: single_train
class: train
# num of epochs
epochs: 2
# device to run training or infer
......
......@@ -47,7 +47,7 @@ mode: train_runner
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: train_runner
class: single_train
class: train
# num of epochs
epochs: 2
# device to run training or infer
......
......@@ -45,7 +45,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
device: cpu
epochs: 3
save_checkpoint_interval: 2
......
......@@ -42,7 +42,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
device: cpu
epochs: 3
save_checkpoint_interval: 2
......
......@@ -41,7 +41,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
device: cpu
epochs: 3
save_checkpoint_interval: 2
......
......@@ -47,7 +47,7 @@ mode: train_runner
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: train_runner
class: single_train
class: train
# num of epochs
epochs: 2
# device to run training or infer
......
......@@ -38,7 +38,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
device: cpu
epochs: 3
save_checkpoint_interval: 2
......
......@@ -43,7 +43,7 @@ mode: train_runner
runner:
- name: train_runner
class: single_train
class: train
device: cpu
epochs: 3
save_checkpoint_interval: 2
......
......@@ -64,7 +64,7 @@ mode: runner1
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: runner1
class: single_train
class: train
startup_class_path: "{workspace}/tdm_startup.py"
# num of epochs
epochs: 10
......
......@@ -26,10 +26,8 @@ from paddlerec.core.utils import validation
engines = {}
device = ["CPU", "GPU"]
engine_choices = [
"TRAIN", "SINGLE_TRAIN", "INFER", "SINGLE_INFER", "LOCAL_CLUSTER",
"LOCAL_CLUSTER_TRAIN", "CLUSTER_TRAIN"
]
engine_choices = ["TRAIN", "INFER", "LOCAL_CLUSTER_TRAIN", "CLUSTER_TRAIN"]
def engine_registry():
......@@ -37,16 +35,11 @@ def engine_registry():
engines["PSLIB"] = {}
engines["TRANSPILER"]["TRAIN"] = single_train_engine
engines["TRANSPILER"]["SINGLE_TRAIN"] = single_train_engine
engines["TRANSPILER"]["INFER"] = single_infer_engine
engines["TRANSPILER"]["SINGLE_INFER"] = single_infer_engine
engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine
engines["TRANSPILER"]["CLUSTER"] = cluster_engine
engines["PSLIB"]["SINGLE_TRAIN"] = local_mpi_engine
engines["PSLIB"]["TRAIN"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine
engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
......@@ -142,7 +135,7 @@ def get_engine(args, running_config, mode):
selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num > 1:
engine = "LOCAL_CLUSTER"
engine = "LOCAL_CLUSTER_TRAIN"
if engine not in engine_choices:
raise ValueError("{} can not be chosen in {}".format(engine_class,
......@@ -219,7 +212,6 @@ def single_train_engine(args):
def single_infer_engine(args):
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
......@@ -260,48 +252,60 @@ def single_infer_engine(args):
def cluster_engine(args):
def master():
role = "MASTER"
from paddlerec.core.engine.cluster.cluster import ClusterEngine
_envs = envs.load_yaml(args.backend)
flattens = envs.flatten_environs(_envs, "_")
flattens["engine_role"] = role
flattens["engine_role"] = "MASTER"
flattens["engine_mode"] = envs.get_runtime_environ("mode")
flattens["engine_run_config"] = args.model
flattens["engine_temp_path"] = tempfile.mkdtemp()
envs.set_runtime_environs(flattens)
ClusterEngine.workspace_replace()
print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value")))
launch = ClusterEngine(None, args.model)
return launch
def worker():
role = "WORKER"
def worker(mode):
if not mode:
raise ValueError("mode: {} can not be recognized")
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
strategy_class = ".".join(["runner", mode, "distribute_strategy"])
worker_class = ".".join(["runner", mode, "worker_num"])
server_class = ".".join(["runner", mode, "server_num"])
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
distributed_strategy = run_extras.get(strategy_class, "async")
worker_num = run_extras.get(worker_class, 1)
server_num = run_extras.get(server_class, 1)
executor_mode = "train"
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model,
["train.", "runner."])
trainer_class = run_extras.get(
"runner." + _envs["mode"] + ".trainer_class", None)
device = device.upper()
fleet_mode = fleet_mode.upper()
if trainer_class:
trainer = trainer_class
else:
trainer = "GeneralTrainer"
if fleet_mode == "COLLECTIVE" and device != "GPU":
raise ValueError("COLLECTIVE can not be used with GPU")
executor_mode = "train"
cluster_envs = {}
distributed_strategy = run_extras.get(
"runner." + _envs["mode"] + ".distribute_strategy", "async")
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
"ps")
if device == "GPU":
cluster_envs["selected_gpus"] = selected_gpus
cluster_envs = {}
cluster_envs["selected_gpus"] = selected_gpus
cluster_envs["server_num"] = server_num
cluster_envs["worker_num"] = worker_num
cluster_envs["fleet_mode"] = fleet_mode
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.executor_mode"] = executor_mode
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.executor_mode"] = executor_mode
cluster_envs["train.trainer.strategy"] = distributed_strategy
cluster_envs["train.trainer.threads"] = envs.get_runtime_environ(
"CPU_NUM")
......@@ -316,7 +320,8 @@ def cluster_engine(args):
role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER")
if role == "WORKER":
return worker()
mode = os.getenv("PADDLE_PADDLEREC_MODE", None)
return worker(mode)
else:
return master()
......@@ -336,39 +341,83 @@ def cluster_mpi_engine(args):
def local_cluster_engine(args):
from paddlerec.core.engine.local_cluster import LocalClusterEngine
def get_worker_num(run_extras, workers):
_envs = envs.load_yaml(args.model)
mode = envs.get_runtime_environ("mode")
workspace = envs.get_runtime_environ("workspace")
phases_class = ".".join(["runner", mode, "phases"])
phase_names = run_extras.get(phases_class)
phases = []
all_phases = _envs.get("phase")
if phase_names is None:
phases = all_phases
else:
for phase in all_phases:
if phase["name"] in phase_names:
phases.append(phase)
dataset_names = []
for phase in phases:
dataset_names.append(phase["dataset_name"])
datapaths = []
for dataset in _envs.get("dataset"):
if dataset["name"] in dataset_names:
datapaths.append(dataset["data_path"])
if not datapaths:
raise ValueError("data path must exist for training/inference")
datapaths = [
envs.workspace_adapter_by_specific(path, workspace)
for path in datapaths
]
all_workers = [len(os.listdir(path)) for path in datapaths]
all_workers.append(workers)
return min(all_workers)
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
trainer_class = run_extras.get("runner." + _envs["mode"] + ".runner_class",
None)
from paddlerec.core.engine.local_cluster import LocalClusterEngine
if trainer_class:
trainer = trainer_class
else:
trainer = "GeneralTrainer"
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
strategy_class = ".".join(["runner", mode, "distribute_strategy"])
worker_class = ".".join(["runner", mode, "worker_num"])
server_class = ".".join(["runner", mode, "server_num"])
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
distributed_strategy = run_extras.get(strategy_class, "async")
executor_mode = "train"
distributed_strategy = run_extras.get(
"runner." + _envs["mode"] + ".distribute_strategy", "async")
worker_num = run_extras.get("runner." + _envs["mode"] + ".worker_num", 1)
server_num = run_extras.get("runner." + _envs["mode"] + ".server_num", 1)
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode", "")
if fleet_mode == "":
device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu")
if len(selected_gpus.split(",")) > 1 and device.upper() == "GPU":
fleet_mode = "COLLECTIVE"
else:
fleet_mode = "PS"
worker_num = run_extras.get(worker_class, 1)
server_num = run_extras.get(server_class, 1)
max_worker_num = get_worker_num(run_extras, worker_num)
if max_worker_num < worker_num:
print(
"has phase do not have enough datas for training, set worker num from {} to {}".
format(worker_num, max_worker_num))
worker_num = max_worker_num
device = device.upper()
fleet_mode = fleet_mode.upper()
if fleet_mode == "COLLECTIVE" and device != "GPU":
raise ValueError("COLLECTIVE can not be used with GPU")
cluster_envs = {}
if device == "GPU":
cluster_envs["selected_gpus"] = selected_gpus
cluster_envs["server_num"] = server_num
cluster_envs["worker_num"] = worker_num
cluster_envs["selected_gpus"] = selected_gpus
cluster_envs["start_port"] = envs.find_free_port()
cluster_envs["fleet_mode"] = fleet_mode
cluster_envs["log_dir"] = "logs"
......@@ -376,10 +425,10 @@ def local_cluster_engine(args):
cluster_envs["train.trainer.executor_mode"] = executor_mode
cluster_envs["train.trainer.strategy"] = distributed_strategy
cluster_envs["train.trainer.threads"] = "2"
cluster_envs["CPU_NUM"] = cluster_envs["train.trainer.threads"]
cluster_envs["train.trainer.engine"] = "local_cluster"
cluster_envs["train.trainer.platform"] = envs.get_platform()
cluster_envs["CPU_NUM"] = "2"
print("launch {} engine with cluster to run model: {}".format(trainer,
args.model))
......@@ -400,20 +449,16 @@ def local_mpi_engine(args):
if not mpi:
raise RuntimeError("can not find mpirun, please check environment")
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
trainer_class = run_extras.get("runner." + _envs["mode"] + ".runner_class",
None)
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
distributed_strategy = "async"
executor_mode = "train"
distributed_strategy = run_extras.get(
"runner." + _envs["mode"] + ".distribute_strategy", "async")
fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
"ps")
if trainer_class:
trainer = trainer_class
else:
trainer = "GeneralTrainer"
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
cluster_envs = {}
cluster_envs["mpirun"] = mpi
......@@ -424,7 +469,6 @@ def local_mpi_engine(args):
cluster_envs["fleet_mode"] = fleet_mode
cluster_envs["train.trainer.strategy"] = distributed_strategy
cluster_envs["train.trainer.threads"] = "2"
cluster_envs["train.trainer.engine"] = "local_cluster"
cluster_envs["train.trainer.platform"] = envs.get_platform()
set_runtime_envs(cluster_envs, args.model)
......@@ -458,11 +502,15 @@ if __name__ == "__main__":
sys.exit(-1)
engine_registry()
running_config = get_all_inters_from_yaml(args.model, ["mode", "runner."])
running_config = get_all_inters_from_yaml(
args.model, ["workspace", "mode", "runner."])
modes = get_modes(running_config)
for mode in modes:
envs.set_runtime_environs({"mode": mode})
envs.set_runtime_environs({
"mode": mode,
"workspace": running_config["workspace"]
})
which_engine = get_engine(args, running_config, mode)
engine = which_engine(args)
engine.run()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# !/bin/env python
import os
import sys
import time
import socket
from kubernetes import client, config
NAMESPACE = os.getenv("NAMESPACE")
if os.getenv("KUBERNETES_SERVICE_HOST", None):
config.load_incluster_config()
else:
config.load_kube_config()
v1 = client.CoreV1Api()
def get_pod_status(item):
phase = item.status.phase
# check terminate time although phase is Running.
if item.metadata.deletion_timestamp != None:
return "Terminating"
return phase
def containers_all_ready(label_selector):
def container_statuses_ready(item):
container_statuses = item.status.container_statuses
for status in container_statuses:
if not status.ready:
return False
return True
api_response = v1.list_namespaced_pod(
namespace=NAMESPACE, pretty=True, label_selector=label_selector)
for item in api_response.items:
if not container_statuses_ready(item):
return False
return True
def fetch_pods_info(label_selector, phase=None):
api_response = v1.list_namespaced_pod(
namespace=NAMESPACE, pretty=True, label_selector=label_selector)
pod_list = []
for item in api_response.items:
if phase is not None and get_pod_status(item) != phase:
continue
pod_list.append(
(item.status.phase, item.status.pod_ip, item.metadata.name))
return pod_list
def wait_pods_running(label_selector, desired):
print("label selector: %s, desired: %s" % (label_selector, desired))
while True:
count = count_pods_by_phase(label_selector, 'Running')
# NOTE: pods may be scaled.
if count >= int(desired):
break
print('current cnt: %d sleep for 5 seconds...' % count)
time.sleep(5)
def wait_containers_ready(label_selector):
print("label selector: %s, wait all containers ready" % (label_selector))
while True:
if containers_all_ready(label_selector):
break
print('not all containers ready, sleep for 5 seconds...')
time.sleep(5)
def count_pods_by_phase(label_selector, phase):
pod_list = fetch_pods_info(label_selector, phase)
return len(pod_list)
def fetch_ips_list(label_selector, phase=None):
pod_list = fetch_pods_info(label_selector, phase)
ips = [item[1] for item in pod_list]
ips.sort()
return ips
def fetch_name_list(label_selector, phase=None):
pod_list = fetch_pods_info(label_selector, phase)
names = [item[2] for item in pod_list]
names.sort()
return names
def fetch_ips_string(label_selector, phase=None):
ips = fetch_ips_list(label_selector, phase)
return ",".join(ips)
def fetch_endpoints_string(label_selector, port, phase=None, sameport=True):
ips = fetch_ips_list(label_selector, phase)
if sameport:
ips = ["{0}:{1}".format(ip, port) for ip in ips]
else:
srcips = ips
ips = []
port = int(port)
for ip in srcips:
ips.append("{0}:{1}".format(ip, port))
port = port + 1
return ",".join(ips)
def fetch_pod_id(label_selector, phase=None, byname=True):
if byname:
names = fetch_name_list(label_selector, phase=phase)
local_name = os.getenv('POD_NAME')
for i in xrange(len(names)):
if names[i] == local_name:
return i
return None
else:
ips = fetch_ips_list(label_selector, phase=phase)
local_ip = socket.gethostbyname(socket.gethostname())
for i in xrange(len(ips)):
if ips[i] == local_ip:
return i
# in minikube there can be one node only
local_ip = os.getenv("POD_IP")
for i in xrange(len(ips)):
if ips[i] == local_ip:
return i
return None
def fetch_ips(label_selector):
return fetch_ips_string(label_selector, phase="Running")
def fetch_endpoints(label_selector, port):
return fetch_endpoints_string(
label_selector, port=port, phase="Running", sameport=True)
def fetch_id(label_selector):
return fetch_pod_id(label_selector, phase="Running")
if __name__ == "__main__":
command = sys.argv[1]
if command == "fetch_ips":
print(fetch_ips(sys.argv[2]))
if command == "fetch_ips_string":
print(fetch_ips_string(sys.argv[2], sys.argv[3]))
elif command == "fetch_endpoints":
print(fetch_endpoints(sys.argv[2], sys.argv[3]))
elif command == "fetch_id":
print(fetch_id(sys.argv[2]))
elif command == "count_pods_by_phase":
print(count_pods_by_phase(sys.argv[2], sys.argv[3]))
elif command == "wait_pods_running":
wait_pods_running(sys.argv[2], sys.argv[3])
elif command == "wait_containers_ready":
wait_containers_ready(sys.argv[2])
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册