未验证 提交 15c57177 编写于 作者: C Chengmo 提交者: GitHub

add cluster support (#136)

* add cluster support
上级 b8e17866
echo "Run before_hook.sh ..."
wget https://paddlerec.bj.bcebos.com/whl/PaddleRec.tar.gz
tar -xf PaddleRec.tar.gz
cd PaddleRec
python setup.py install
pip uninstall -y paddlepaddle
pip install paddlepaddle-gpu==<$ PADDLEPADDLE_VERSION $> --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com
echo "End before_hook.sh ..."
echo "Run before_hook.sh ..."
wget https://paddlerec.bj.bcebos.com/whl/PaddleRec.tar.gz
tar -xf PaddleRec.tar.gz
cd PaddleRec
python setup.py install
pip uninstall -y paddlepaddle
pip install paddlepaddle-gpu==<$ PADDLEPADDLE_VERSION $>.post107 --index-url=http://pip.baidu.com/pypi/simple --trusted-host pip.baidu.com
echo "End before_hook.sh ..."
...@@ -16,23 +16,13 @@ ...@@ -16,23 +16,13 @@
################################################### ###################################################
# Usage: submit.sh # Usage: submit.sh
# Description: run mpi submit client implement # Description: run paddlecloud submit client implement
################################################### ###################################################
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
# variable define # # variable define #
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
#-----------------------------------------------------------------------------------------------------------------
#fun : package
#param : N/A
#return : 0 -- success; not 0 -- failure
#-----------------------------------------------------------------------------------------------------------------
function package_hook() {
g_run_stage="package"
package
}
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
#fun : before hook submit to cluster #fun : before hook submit to cluster
#param : N/A #param : N/A
...@@ -40,17 +30,106 @@ function package_hook() { ...@@ -40,17 +30,106 @@ function package_hook() {
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
function _before_submit() { function _before_submit() {
echo "before_submit" echo "before_submit"
before_submit_hook
if [ ${DISTRIBUTE_MODE} == "PS_CPU_MPI" ]; then
_gen_cpu_before_hook
_gen_mpi_config
_gen_mpi_job
_gen_end_hook
elif [ ${DISTRIBUTE_MODE} == "COLLECTIVE_GPU_K8S" ]; then
_gen_gpu_before_hook
_gen_k8s_config
_gen_k8s_job
_gen_end_hook
fi
}
function _gen_mpi_config() {
echo "gen mpi_config.ini"
sed -e "s#<$ FS_NAME $>#$FS_NAME#g" \
-e "s#<$ FS_UGI $>#$FS_UGI#g" \
-e "s#<$ TRAIN_DATA_PATH $>#$TRAIN_DATA_PATH#g" \
-e "s#<$ TEST_DATA_PATH $>#$TEST_DATA_PATH#g" \
-e "s#<$ OUTPUT_PATH $>#$OUTPUT_PATH#g" \
-e "s#<$ THIRDPARTY_PATH $>#$THIRDPARTY_PATH#g" \
-e "s#<$ CPU_NUM $>#$max_thread_num#g" \
-e "s#<$ FLAGS_communicator_is_sgd_optimizer $>#$FLAGS_communicator_is_sgd_optimizer#g" \
-e "s#<$ FLAGS_communicator_send_queue_size $>#$FLAGS_communicator_send_queue_size#g" \
-e "s#<$ FLAGS_communicator_thread_pool_size $>#$FLAGS_communicator_thread_pool_size#g" \
-e "s#<$ FLAGS_communicator_max_merge_var_num $>#$FLAGS_communicator_max_merge_var_num#g" \
-e "s#<$ FLAGS_communicator_max_send_grad_num_before_recv $>#$FLAGS_communicator_max_send_grad_num_before_recv#g" \
-e "s#<$ FLAGS_communicator_fake_rpc $>#$FLAGS_communicator_fake_rpc#g" \
-e "s#<$ FLAGS_rpc_retry_times $>#$FLAGS_rpc_retry_times#g" \
${abs_dir}/cloud/mpi_config.ini.template >${PWD}/config.ini
}
function _gen_k8s_config() {
echo "gen k8s_config.ini"
sed -e "s#<$ FS_NAME $>#$FS_NAME#g" \
-e "s#<$ FS_UGI $>#$FS_UGI#g" \
-e "s#<$ AFS_REMOTE_MOUNT_POINT $>#$AFS_REMOTE_MOUNT_POINT#g" \
-e "s#<$ OUTPUT_PATH $>#$OUTPUT_PATH#g" \
-e "s#<$ CPU_NUM $>#$max_thread_num#g" \
-e "s#<$ FLAGS_communicator_is_sgd_optimizer $>#$FLAGS_communicator_is_sgd_optimizer#g" \
-e "s#<$ FLAGS_communicator_send_queue_size $>#$FLAGS_communicator_send_queue_size#g" \
-e "s#<$ FLAGS_communicator_thread_pool_size $>#$FLAGS_communicator_thread_pool_size#g" \
-e "s#<$ FLAGS_communicator_max_merge_var_num $>#$FLAGS_communicator_max_merge_var_num#g" \
-e "s#<$ FLAGS_communicator_max_send_grad_num_before_recv $>#$FLAGS_communicator_max_send_grad_num_before_recv#g" \
-e "s#<$ FLAGS_communicator_fake_rpc $>#$FLAGS_communicator_fake_rpc#g" \
-e "s#<$ FLAGS_rpc_retry_times $>#$FLAGS_rpc_retry_times#g" \
${abs_dir}/cloud/k8s_config.ini.template >${PWD}/config.ini
}
function _gen_cpu_before_hook() {
echo "gen cpu before_hook.sh"
sed -e "s#<$ PADDLEPADDLE_VERSION $>#$PADDLE_VERSION#g" \
${abs_dir}/cloud/before_hook_cpu.sh.template >${PWD}/before_hook.sh
} }
function _gen_gpu_before_hook() {
echo "gen gpu before_hook.sh"
sed -e "s#<$ PADDLEPADDLE_VERSION $>#$PADDLE_VERSION#g" \
${abs_dir}/cloud/before_hook_gpu.sh.template >${PWD}/before_hook.sh
}
function _gen_end_hook() {
echo "gen end_hook.sh"
cp ${abs_dir}/cloud/end_hook.sh.template ${PWD}/end_hook.sh
}
function _gen_mpi_job() {
echo "gen mpi_job.sh"
sed -e "s#<$ GROUP_NAME $>#$GROUP_NAME#g" \
-e "s#<$ AK $>#$AK#g" \
-e "s#<$ SK $>#$SK#g" \
-e "s#<$ MPI_PRIORITY $>#$PRIORITY#g" \
-e "s#<$ MPI_NODES $>#$MPI_NODES#g" \
-e "s#<$ START_CMD $>#$START_CMD#g" \
${abs_dir}/cloud/mpi_job.sh.template >${PWD}/job.sh
}
function _gen_k8s_job() {
echo "gen k8s_job.sh"
sed -e "s#<$ GROUP_NAME $>#$GROUP_NAME#g" \
-e "s#<$ AK $>#$AK#g" \
-e "s#<$ SK $>#$SK#g" \
-e "s#<$ K8S_PRIORITY $>#$PRIORITY#g" \
-e "s#<$ K8S_TRAINERS $>#$K8S_TRAINERS#g" \
-e "s#<$ K8S_GPU_CARD $>#$K8S_GPU_CARD#g" \
-e "s#<$ START_CMD $>#$START_CMD#g" \
${abs_dir}/cloud/k8s_job.sh.template >${PWD}/job.sh
}
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
#fun : after hook submit to cluster #fun : after hook submit to cluster
#param : N/A #param : N/A
#return : 0 -- success; not 0 -- failure #return : 0 -- success; not 0 -- failure
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
function _after_submit() { function _after_submit() {
echo "after_submit" echo "end submit"
after_submit_hook
} }
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
...@@ -60,23 +139,18 @@ function _after_submit() { ...@@ -60,23 +139,18 @@ function _after_submit() {
#----------------------------------------------------------------------------------------------------------------- #-----------------------------------------------------------------------------------------------------------------
function _submit() { function _submit() {
g_run_stage="submit" g_run_stage="submit"
sh job.sh
}
cd ${engine_temp_path} function package_hook() {
cur_time=`date +"%Y%m%d%H%M"`
paddlecloud job --ak ${engine_submit_ak} --sk ${engine_submit_sk} train --cluster-name ${engine_submit_cluster} \ new_job_name="${JOB_NAME}_${cur_time}"
--job-version ${engine_submit_version} \ export JOB_NAME=${new_job_name}
--mpi-priority ${engine_submit_priority} \ export job_file_path="${PWD}/${new_job_name}"
--mpi-wall-time 300:59:00 \ mkdir ${job_file_path}
--mpi-nodes ${engine_submit_nodes} --is-standalone 0 \ cp $FILES ${job_file_path}/
--mpi-memory 110Gi \ cd ${job_file_path}
--job-name ${engine_submit_jobname} \ echo "The task submission folder is generated at ${job_file_path}"
--start-cmd "${g_run_cmd}" \
--group-name ${engine_submit_group} \
--job-conf ${engine_submit_config} \
--files ${g_submitfiles} \
--json
cd -
} }
function submit_hook() { function submit_hook() {
...@@ -86,8 +160,6 @@ function submit_hook() { ...@@ -86,8 +160,6 @@ function submit_hook() {
} }
function main() { function main() {
source ${engine_submit_scrpit}
package_hook package_hook
submit_hook submit_hook
} }
......
echo "Run before_hook.sh ..."
\ No newline at end of file
# 必须涵盖的参数
fs_name=<$ FS_NAME $>
fs_ugi=<$ FS_UGI $>
# 模型输出目录
output_path=<$ OUTPUT_PATH $>
# ===================
# 以下是新增参数
# ===================
# 挂载 afs 的开关
mount_afs="true"
# afs 路径的远端挂载点
AFS_REMOTE_MOUNT_POINT=<$ AFS_REMOTE_MOUNT_POINT $>
# 作业运行环境的本地挂载点,/root/paddlejob/workspace/env_run/是一个固定路径,是平台运行时workspace的路径
afs_local_mount_point="/root/paddlejob/workspace/env_run/afs/"
# 可以访问运行时默认文件夹下的 ./afs/ 目录拿到挂载目录的文件
# 新k8s afs挂载帮助文档: http://wiki.baidu.com/pages/viewpage.action?pageId=906443193
PADDLE_PADDLEREC_ROLE=WORKER
CPU_NUM=<$ CPU_NUM $>
GLOG_v=0
FLAGS_communicator_is_sgd_optimizer=<$ FLAGS_communicator_is_sgd_optimizer $>
FLAGS_communicator_send_queue_size=<$ FLAGS_communicator_send_queue_size $>
FLAGS_communicator_thread_pool_size=<$ FLAGS_communicator_thread_pool_size $>
FLAGS_communicator_max_merge_var_num=<$ FLAGS_communicator_max_merge_var_num $>
FLAGS_communicator_max_send_grad_num_before_recv=<$ FLAGS_communicator_max_send_grad_num_before_recv $>
FLAGS_communicator_fake_rpc=<$ FLAGS_communicator_fake_rpc $>
FLAGS_rpc_retry_times=<$ FLAGS_rpc_retry_times $>
\ No newline at end of file
#!/bin/bash
###############################################################
## 注意-- 注意--注意 ##
## K8S NCCL2多机作业作业示例 ##
###############################################################
job_name=${JOB_NAME}
# 作业参数
group_name="<$ GROUP_NAME $>"
job_version="paddle-fluid-v1.7.1"
start_cmd="<$ START_CMD $>"
wall_time="10:00:00"
k8s_priority=<$ K8S_PRIORITY $>
k8s_trainers=<$ K8S_TRAINERS $>
k8s_gpu_cards=<$ K8S_GPU_CARD $>
# 你的ak/sk(可在paddlecloud web页面【个人中心】处获取)
ak=<$ AK $>
sk=<$ SK $>
paddlecloud job --ak ${ak} --sk ${sk} \
train --job-name ${job_name} \
--group-name ${group_name} \
--job-conf config.ini \
--start-cmd "${start_cmd}" \
--files ./* \
--job-version ${job_version} \
--k8s-trainers ${k8s_trainers} \
--k8s-gpu-cards ${k8s_gpu_cards} \
--k8s-priority ${k8s_priority} \
--wall-time ${wall_time} \
--is-standalone 0 \
--distribute-job-type "NCCL2" \
--json
\ No newline at end of file
#type of storage cluster
storage_type="hdfs"
#attention: files for training should be put on hdfs
force_reuse_output_path="True"
# 可以替换成自己的hdfs集群
fs_name=<$ FS_NAME $>
fs_ugi=<$ FS_UGI $>
FLAGS_rpc_deadline=300000
##train data path on hdfs
train_data_path=<$ TRAIN_DATA_PATH $>
test_data_path=<$ TEST_DATA_PATH $>
output_path=<$ OUTPUT_PATH $>
thirdparty_path=<$ THIRDPARTY_PATH $>
PADDLE_PADDLEREC_ROLE=WORKER
CPU_NUM=<$ CPU_NUM $>
GLOG_v=0
FLAGS_communicator_is_sgd_optimizer=<$ FLAGS_communicator_is_sgd_optimizer $>
FLAGS_communicator_send_queue_size=<$ FLAGS_communicator_send_queue_size $>
FLAGS_communicator_thread_pool_size=<$ FLAGS_communicator_thread_pool_size $>
FLAGS_communicator_max_merge_var_num=<$ FLAGS_communicator_max_merge_var_num $>
FLAGS_communicator_max_send_grad_num_before_recv=<$ FLAGS_communicator_max_send_grad_num_before_recv $>
FLAGS_communicator_fake_rpc=<$ FLAGS_communicator_fake_rpc $>
FLAGS_rpc_retry_times=<$ FLAGS_rpc_retry_times $>
#!/bin/bash
###############################################################
## 注意--注意--注意 ##
## MPI 类型作业演示 ##
###############################################################
job_name=${JOB_NAME}
# 作业参数
group_name=<$ GROUP_NAME $>
job_version="paddle-fluid-v1.7.1"
start_cmd="<$ START_CMD $>"
wall_time="2:00:00"
# 你的ak/sk(可在paddlecloud web页面【个人中心】处获取)
ak=<$ AK $>
sk=<$ SK $>
paddlecloud job --ak ${ak} --sk ${sk} \
train \
--job-name ${job_name} \
--mpi-priority <$ MPI_PRIORITY $> \
--group-name ${group_name} \
--mpi-wall-time ${wall_time} \
--mpi-nodes <$ MPI_NODES $> \
--is-standalone 0 \
--permission group \
--job-version ${job_version} \
--job-conf config.ini \
--start-cmd "${start_cmd}" \
--files ./* \
--json
...@@ -18,6 +18,7 @@ from __future__ import unicode_literals ...@@ -18,6 +18,7 @@ from __future__ import unicode_literals
import copy import copy
import os import os
import subprocess import subprocess
import warnings
from paddlerec.core.engine.engine import Engine from paddlerec.core.engine.engine import Engine
from paddlerec.core.factory import TrainerFactory from paddlerec.core.factory import TrainerFactory
...@@ -26,24 +27,35 @@ from paddlerec.core.utils import envs ...@@ -26,24 +27,35 @@ from paddlerec.core.utils import envs
class ClusterEngine(Engine): class ClusterEngine(Engine):
def __init_impl__(self): def __init_impl__(self):
self.role = envs.get_runtime_environ("engine_role")
if self.role == "WORKER":
return
abs_dir = os.path.dirname(os.path.abspath(__file__)) abs_dir = os.path.dirname(os.path.abspath(__file__))
os.environ["abs_dir"] = str(abs_dir)
backend = envs.get_runtime_environ("engine_backend") self.backend = envs.get_runtime_environ("backend")
if not backend: if not self.backend:
backend = "" self.backend = ""
backend = backend.upper() self.backend = self.backend.upper()
if backend == "PADDLECLOUD": if self.backend == "PADDLECLOUD":
self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh") self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh")
elif backend == "KUBERNETES": elif self.backend == "KUBERNETES":
self.submit_script = os.path.join(abs_dir, "k8s/cluster.sh") self.submit_script = os.path.join(abs_dir, "k8s/cluster.sh")
else: else:
raise ValueError("{} can not be supported now".format(backend)) raise ValueError("{} can not be supported now".format(
self.backend))
def start_worker_procs(self): def start_worker_procs(self):
trainer = TrainerFactory.create(self.trainer) trainer = TrainerFactory.create(self.trainer)
trainer.run() trainer.run()
def start_master_procs(self): def start_master_procs(self):
if self.backend == "PADDLECLOUD":
self.paddlecloud_env_check()
elif self.backend == "KUBERNETES":
self.kubernetes_env_check()
default_env = os.environ.copy() default_env = os.environ.copy()
current_env = copy.copy(default_env) current_env = copy.copy(default_env)
current_env.pop("http_proxy", None) current_env.pop("http_proxy", None)
...@@ -55,21 +67,229 @@ class ClusterEngine(Engine): ...@@ -55,21 +67,229 @@ class ClusterEngine(Engine):
@staticmethod @staticmethod
def workspace_replace(): def workspace_replace():
workspace = envs.get_runtime_environ("engine_workspace") workspace = envs.get_runtime_environ("workspace")
for k, v in os.environ.items(): for k, v in os.environ.items():
v = v.replace("{workspace}", workspace) v = v.replace("{workspace}", workspace)
os.environ[k] = str(v) os.environ[k] = str(v)
def run(self): def run(self):
role = envs.get_runtime_environ("engine_role") if self.role == "MASTER":
if role == "MASTER":
self.start_master_procs() self.start_master_procs()
elif role == "WORKER": elif self.role == "WORKER":
self.start_worker_procs() self.start_worker_procs()
else: else:
raise ValueError("role {} error, must in MASTER/WORKER".format( raise ValueError("role {} error, must in MASTER/WORKER".format(
role)) self.role))
def paddlecloud_env_check(self):
# get fleet mode
fleet_mode = envs.get_runtime_environ("fleet_mode")
# get device
device = envs.get_runtime_environ("device")
# get cluster type
cluster_type = envs.get_runtime_environ("cluster_type")
cluster_env_check_tool = None
if cluster_type.upper() == "MPI":
if device == "CPU" and fleet_mode == "PS":
cluster_env_check_tool = PaddleCloudMpiEnv()
else:
raise ValueError(
"Paddlecloud with Mpi don't support GPU training, check your config"
)
elif cluster_type.upper() == "K8S":
if fleet_mode == "PS":
if device == "CPU":
raise ValueError(
"PS-CPU on paddlecloud is not supported at this time, comming soon"
)
elif device == "GPU":
raise ValueError(
"PS-GPU on paddlecloud is not supported at this time, comming soon"
)
if fleet_mode == "COLLECTIVE":
if device == "GPU":
cluster_env_check_tool = CloudCollectiveEnv()
elif device == "CPU":
raise ValueError(
"Unexpected config -> device: CPU with fleet_mode: Collective, check your config"
)
else:
raise ValueError("cluster_type {} error, must in MPI/K8S".format(
cluster_type))
cluster_env_check_tool.env_check()
cluster_env_check_tool.env_set()
def kubernetes_env_check(self):
pass
class ClusterEnvBase(object):
def __init__(self):
# get backend env
backend_yaml = envs.get_runtime_environ("backend_yaml")
_env = envs.load_yaml(backend_yaml)
self.backend_env = envs.flatten_environs(_env, ".")
self.cluster_env = {}
def env_check(self):
# check common env
# fs_name & fs_ugi
self.cluster_env["FS_NAME"] = self.backend_env.get("config.fs_name",
"")
self.cluster_env["FS_UGI"] = self.backend_env.get("config.fs_ugi", "")
if self.cluster_env["FS_NAME"] == "" or self.cluster_env[
"FS_UGI"] == "":
raise ValueError(
"No -- FS_UGI or FS_NAME -- found in your backend.yaml, please check."
)
# output_path
self.cluster_env["OUTPUT_PATH"] = self.backend_env.get(
"config.output_path", "")
if self.cluster_env["OUTPUT_PATH"] == "":
warnings.warn(
"Job output_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# paddle_version
self.cluster_env["PADDLE_VERSION"] = self.backend_env.get(
"config.paddle_version", "1.7.2")
# communicator
self.cluster_env[
"FLAGS_communicator_is_sgd_optimizer"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_is_sgd_optimizer", 0)
self.cluster_env[
"FLAGS_communicator_send_queue_size"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_send_queue_size", 5)
self.cluster_env[
"FLAGS_communicator_thread_pool_size"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_thread_pool_size", 32)
self.cluster_env[
"FLAGS_communicator_max_merge_var_num"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_max_merge_var_num", 5)
self.cluster_env[
"FLAGS_communicator_max_send_grad_num_before_recv"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_max_send_grad_num_before_recv",
5)
self.cluster_env["FLAGS_communicator_fake_rpc"] = self.backend_env.get(
"config.communicator.FLAGS_communicator_fake_rpc", 0)
self.cluster_env["FLAGS_rpc_retry_times"] = self.backend_env.get(
"config.communicator.FLAGS_rpc_retry_times", 3)
# ak & sk
self.cluster_env["AK"] = self.backend_env.get("submit.ak", "")
self.cluster_env["SK"] = self.backend_env.get("submit.sk", "")
if self.cluster_env["AK"] == "" or self.cluster_env["SK"] == "":
raise ValueError(
"No -- AK or SK -- found in your backend.yaml, please check.")
# priority
self.cluster_env["PRIORITY"] = self.backend_env.get("submit.priority",
"high")
# job name
self.cluster_env["JOB_NAME"] = self.backend_env.get(
"submit.job_name", "PaddleRecClusterJob")
# group
self.cluster_env["GROUP_NAME"] = self.backend_env.get("submit.group",
"paddle")
# start_cmd
self.cluster_env["START_CMD"] = self.backend_env.get(
"submit.start_cmd", "python -m paddlerec.run -m config.yaml")
# files
self.cluster_env["FILES"] = self.backend_env.get("submit.files", "")
if self.cluster_env["FILES"] == "":
raise ValueError(
"No -- files -- found in your backend.yaml, please check.")
def env_set(self):
envs.set_runtime_environs(self.cluster_env)
flattens = envs.flatten_environs(self.cluster_env)
print(envs.pretty_print_envs(flattens, ("Cluster Envs", "Value")))
class PaddleCloudMpiEnv(ClusterEnvBase):
def __init__(self):
super(PaddleCloudMpiEnv, self).__init__()
def env_check(self):
super(PaddleCloudMpiEnv, self).env_check()
# check mpi env
self.cluster_env["DISTRIBUTE_MODE"] = "PS_CPU_MPI"
# train_data_path
self.cluster_env["TRAIN_DATA_PATH"] = self.backend_env.get(
"config.train_data_path", "")
if self.cluster_env["TRAIN_DATA_PATH"] == "":
raise ValueError(
"No -- TRAIN_DATA_PATH -- found in your backend.yaml, please check."
)
# test_data_path
self.cluster_env["TEST_DATA_PATH"] = self.backend_env.get(
"config.test_data_path", "")
if self.cluster_env["TEST_DATA_PATH"] == "":
warnings.warn(
"Job test_data_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# thirdparty_path
self.cluster_env["THIRDPARTY_PATH"] = self.backend_env.get(
"config.thirdparty_path", "")
if self.cluster_env["THIRDPARTY_PATH"] == "":
warnings.warn(
"Job thirdparty_path not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
# nodes
self.cluster_env["MPI_NODES"] = self.backend_env.get("submit.nodes", 1)
class PaddleCloudK8sEnv(ClusterEnvBase):
def __init__(self):
super(PaddleCloudK8sEnv, self).__init__()
def env_check(self):
super(PaddleCloudK8sEnv, self).env_check()
# check afs_remote_mount_point
self.cluster_env["AFS_REMOTE_MOUNT_POINT"] = self.backend_env.get(
"config.afs_remote_mount_point", "")
if self.cluster_env["AFS_REMOTE_MOUNT_POINT"] == "":
warnings.warn(
"Job afs_remote_mount_point not set! Please check your backend yaml.",
category=UserWarning,
stacklevel=2)
warnings.warn(
"The remote mount point will be mounted to the ./afs/",
category=UserWarning,
stacklevel=2)
class CloudCollectiveEnv(PaddleCloudK8sEnv):
def __init__(self):
super(CloudCollectiveEnv, self).__init__()
def env_check(self):
super(CloudCollectiveEnv, self).env_check()
self.cluster_env["DISTRIBUTE_MODE"] = "COLLECTIVE_GPU_K8S"
self.cluster_env["K8S_TRAINERS"] = self.backend_env.get(
"submit.k8s_trainers", 1)
self.cluster_env["K8S_GPU_CARD"] = self.backend_env.get(
"submit.k8s_gpu_card", 1)
self.cluster_env["K8S_CPU_CORES"] = self.backend_env.get(
"submit.k8s_cpu_cores", 1)
...@@ -118,6 +118,7 @@ class QueueDataset(DatasetBase): ...@@ -118,6 +118,7 @@ class QueueDataset(DatasetBase):
dataset.set_batch_size(batch_size) dataset.set_batch_size(batch_size)
dataset.set_pipe_command(pipe_cmd) dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path") train_data_path = envs.get_global_env(name + "data_path")
file_list = [ file_list = [
os.path.join(train_data_path, x) os.path.join(train_data_path, x)
for x in os.listdir(train_data_path) for x in os.listdir(train_data_path)
...@@ -125,7 +126,7 @@ class QueueDataset(DatasetBase): ...@@ -125,7 +126,7 @@ class QueueDataset(DatasetBase):
if context["engine"] == EngineMode.LOCAL_CLUSTER: if context["engine"] == EngineMode.LOCAL_CLUSTER:
file_list = split_files(file_list, context["fleet"].worker_index(), file_list = split_files(file_list, context["fleet"].worker_index(),
context["fleet"].worker_num()) context["fleet"].worker_num())
print("File_list: {}".format(file_list))
dataset.set_filelist(file_list) dataset.set_filelist(file_list)
for model_dict in context["phases"]: for model_dict in context["phases"]:
if model_dict["dataset_name"] == dataset_name: if model_dict["dataset_name"] == dataset_name:
......
...@@ -42,7 +42,7 @@ def dataloader_by_name(readerclass, ...@@ -42,7 +42,7 @@ def dataloader_by_name(readerclass,
if context["engine"] == EngineMode.LOCAL_CLUSTER: if context["engine"] == EngineMode.LOCAL_CLUSTER:
files = split_files(files, context["fleet"].worker_index(), files = split_files(files, context["fleet"].worker_index(),
context["fleet"].worker_num()) context["fleet"].worker_num())
print("file_list : {}".format(files)) print("file_list : {}".format(files))
reader = reader_class(yaml_file) reader = reader_class(yaml_file)
reader.init() reader.init()
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
workspace: "./"
backend: "PaddleCloud"
cluster_type: k8s # k8s 可选
config:
fs_name: "afs://xxx.com"
fs_ugi: "usr,pwd"
output_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
# for mpi
train_data_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
test_data_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
thirdparty_path: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
paddle_version: "1.7.2" # 填写paddle官方版本号 >= 1.7.2
# for k8s
afs_remote_mount_point: "" # 填远程地址,如afs:/user/your/path/ 则此处填 /user/your/path
# paddle分布式底层超参,无特殊需求不理不改
communicator:
FLAGS_communicator_is_sgd_optimizer: 0
FLAGS_communicator_send_queue_size: 5
FLAGS_communicator_thread_pool_size: 32
FLAGS_communicator_max_merge_var_num: 5
FLAGS_communicator_max_send_grad_num_before_recv: 5
FLAGS_communicator_fake_rpc: 0
FLAGS_rpc_retry_times: 3
submit:
ak: ""
sk: ""
priority: "high"
job_name: "PaddleRec_CTR"
group: ""
start_cmd: "python -m paddlerec.run -m ./config.yaml"
files: ./*.py ./*.yaml
# for mpi ps-cpu
nodes: 2
# for k8s gpu
k8s_trainers: 2
k8s_gpu_card: 1
...@@ -80,6 +80,28 @@ runner: ...@@ -80,6 +80,28 @@ runner:
init_model_path: "increment_dnn" # load model path init_model_path: "increment_dnn" # load model path
phases: [phase2] phases: [phase2]
- name: ps_cluster
class: cluster_train
epochs: 2
device: cpu
fleet_mode: ps
save_checkpoint_interval: 1 # save model interval of epochs
save_checkpoint_path: "increment_dnn" # save checkpoint path
init_model_path: "" # load model path
print_interval: 1
phases: [phase1]
- name: collective_cluster
class: cluster_train
epochs: 2
device: gpu
fleet_mode: collective
save_checkpoint_interval: 1 # save model interval of epochs
save_checkpoint_path: "increment_dnn" # save checkpoint path
init_model_path: "" # load model path
print_interval: 1
phases: [phase1]
# runner will run all the phase in each epoch # runner will run all the phase in each epoch
phase: phase:
- name: phase1 - name: phase1
......
...@@ -38,7 +38,7 @@ def engine_registry(): ...@@ -38,7 +38,7 @@ def engine_registry():
engines["TRANSPILER"]["TRAIN"] = single_train_engine engines["TRANSPILER"]["TRAIN"] = single_train_engine
engines["TRANSPILER"]["INFER"] = single_infer_engine engines["TRANSPILER"]["INFER"] = single_infer_engine
engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine engines["TRANSPILER"]["LOCAL_CLUSTER_TRAIN"] = local_cluster_engine
engines["TRANSPILER"]["CLUSTER"] = cluster_engine engines["TRANSPILER"]["CLUSTER_TRAIN"] = cluster_engine
engines["PSLIB"]["TRAIN"] = local_mpi_engine engines["PSLIB"]["TRAIN"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER_TRAIN"] = local_mpi_engine
engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine engines["PSLIB"]["CLUSTER_TRAIN"] = cluster_mpi_engine
...@@ -111,8 +111,8 @@ def get_engine(args, running_config, mode): ...@@ -111,8 +111,8 @@ def get_engine(args, running_config, mode):
engine = running_config.get(engine_class, None) engine = running_config.get(engine_class, None)
if engine is None: if engine is None:
raise ValueError("not find {} in yaml, please check".format( raise ValueError("not find {} in engine_class , please check".format(
mode, engine_class)) engine))
device = running_config.get(engine_device, None) device = running_config.get(engine_device, None)
engine = engine.upper() engine = engine.upper()
...@@ -262,15 +262,48 @@ def single_infer_engine(args): ...@@ -262,15 +262,48 @@ def single_infer_engine(args):
def cluster_engine(args): def cluster_engine(args):
def master(): def master():
from paddlerec.core.engine.cluster.cluster import ClusterEngine from paddlerec.core.engine.cluster.cluster import ClusterEngine
_envs = envs.load_yaml(args.backend)
flattens = envs.flatten_environs(_envs, "_") # Get fleet_mode & device
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
device = device.upper()
fleet_mode = fleet_mode.upper()
if fleet_mode == "COLLECTIVE" and device != "GPU":
raise ValueError("COLLECTIVE can not be used without GPU")
# Get Thread nums
model_envs = envs.load_yaml(args.model)
phases_class = ".".join(["runner", mode, "phases"])
phase_names = run_extras.get(phases_class)
phases = []
all_phases = model_envs.get("phase")
if phase_names is None:
phases = all_phases
else:
for phase in all_phases:
if phase["name"] in phase_names:
phases.append(phase)
thread_num = []
for phase in phases:
thread_num.append(int(phase["thread_num"]))
max_thread_num = max(thread_num)
backend_envs = envs.load_yaml(args.backend)
flattens = envs.flatten_environs(backend_envs, "_")
flattens["engine_role"] = "MASTER" flattens["engine_role"] = "MASTER"
flattens["engine_mode"] = envs.get_runtime_environ("mode") flattens["engine_mode"] = envs.get_runtime_environ("mode")
flattens["engine_run_config"] = args.model flattens["engine_run_config"] = args.model
flattens["engine_temp_path"] = tempfile.mkdtemp() flattens["max_thread_num"] = max_thread_num
flattens["fleet_mode"] = fleet_mode
flattens["device"] = device
flattens["backend_yaml"] = args.backend
envs.set_runtime_environs(flattens) envs.set_runtime_environs(flattens)
ClusterEngine.workspace_replace()
print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value")))
launch = ClusterEngine(None, args.model) launch = ClusterEngine(None, args.model)
return launch return launch
...@@ -278,40 +311,29 @@ def cluster_engine(args): ...@@ -278,40 +311,29 @@ def cluster_engine(args):
def worker(mode): def worker(mode):
if not mode: if not mode:
raise ValueError("mode: {} can not be recognized") raise ValueError("mode: {} can not be recognized")
from paddlerec.core.engine.cluster.cluster import ClusterEngine
run_extras = get_all_inters_from_yaml(args.model, ["runner."]) run_extras = get_all_inters_from_yaml(args.model, ["runner."])
trainer_class = ".".join(["runner", mode, "trainer_class"]) trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"]) fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"]) device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
strategy_class = ".".join(["runner", mode, "distribute_strategy"]) strategy_class = ".".join(["runner", mode, "distribute_strategy"])
worker_class = ".".join(["runner", mode, "worker_num"])
server_class = ".".join(["runner", mode, "server_num"])
trainer = run_extras.get(trainer_class, "GeneralTrainer") trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps") fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu") device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
distributed_strategy = run_extras.get(strategy_class, "async") distributed_strategy = run_extras.get(strategy_class, "async")
worker_num = run_extras.get(worker_class, 1)
server_num = run_extras.get(server_class, 1)
executor_mode = "train" executor_mode = "train"
device = device.upper() device = device.upper()
fleet_mode = fleet_mode.upper() fleet_mode = fleet_mode.upper()
if fleet_mode == "COLLECTIVE" and device != "GPU": if fleet_mode == "COLLECTIVE" and device != "GPU":
raise ValueError("COLLECTIVE can not be used with GPU") raise ValueError("COLLECTIVE can not be used without GPU")
cluster_envs = {} cluster_envs = {}
if device == "GPU":
cluster_envs["selected_gpus"] = selected_gpus
cluster_envs["server_num"] = server_num
cluster_envs["worker_num"] = worker_num
cluster_envs["fleet_mode"] = fleet_mode cluster_envs["fleet_mode"] = fleet_mode
cluster_envs["engine_role"] = "WORKER"
cluster_envs["train.trainer.trainer"] = trainer cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster" cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.executor_mode"] = executor_mode cluster_envs["train.trainer.executor_mode"] = executor_mode
...@@ -321,15 +343,15 @@ def cluster_engine(args): ...@@ -321,15 +343,15 @@ def cluster_engine(args):
cluster_envs["train.trainer.platform"] = envs.get_platform() cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to with model: {}".format( print("launch {} engine with cluster to with model: {}".format(
trainer, args.model)) trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model) set_runtime_envs(cluster_envs, args.model)
return trainer launch = ClusterEngine(None, args.model)
return launch
role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER") role = os.getenv("PADDLE_PADDLEREC_ROLE", "MASTER")
if role == "WORKER": if role == "WORKER":
mode = os.getenv("PADDLE_PADDLEREC_MODE", None) mode = os.getenv("mode", None)
return worker(mode) return worker(mode)
else: else:
return master() return master()
......
# coding=utf8 # -*- coding: utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
...@@ -69,7 +70,7 @@ def build(dirname): ...@@ -69,7 +70,7 @@ def build(dirname):
'Criteo_data/sample_data/train/*' 'Criteo_data/sample_data/train/*'
] ]
engine_copy = ['*/*.sh'] engine_copy = ['*/*.sh', '*/*.template']
for package in packages: for package in packages:
if package.startswith("paddlerec.models."): if package.startswith("paddlerec.models."):
package_data[package] = models_copy package_data[package] = models_copy
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册