提交 12c654fe 编写于 作者: C chengmo

add tdm & trainer & engine

上级 7eaa453b
...@@ -34,6 +34,12 @@ class Model(object): ...@@ -34,6 +34,12 @@ class Model(object):
""" """
return self._metrics return self._metrics
def custom_preprocess(self):
"""
do something after exe.run(stratup_program) and before run()
"""
pass
def get_fetch_period(self): def get_fetch_period(self):
return self._fetch_interval return self._fetch_interval
...@@ -41,24 +47,30 @@ class Model(object): ...@@ -41,24 +47,30 @@ class Model(object):
name = name.upper() name = name.upper()
optimizers = ["SGD", "ADAM", "ADAGRAD"] optimizers = ["SGD", "ADAM", "ADAGRAD"]
if name not in optimizers: if name not in optimizers:
raise ValueError("configured optimizer can only supported SGD/Adam/Adagrad") raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
if name == "SGD": if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001, self._namespace) reg = envs.get_global_env(
optimizer_i = fluid.optimizer.SGD(lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) "hyper_parameters.reg", 0.0001, self._namespace)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM": elif name == "ADAM":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True) optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
elif name == "ADAGRAD": elif name == "ADAGRAD":
optimizer_i = fluid.optimizer.Adagrad(lr) optimizer_i = fluid.optimizer.Adagrad(lr)
else: else:
raise ValueError("configured optimizer can only supported SGD/Adam/Adagrad") raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
return optimizer_i return optimizer_i
def optimizer(self): def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace) learning_rate = envs.get_global_env(
optimizer = envs.get_global_env("hyper_parameters.optimizer", None, self._namespace) "hyper_parameters.learning_rate", None, self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" %learning_rate) optimizer = envs.get_global_env(
"hyper_parameters.optimizer", None, self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate) return self._build_optimizer(optimizer, learning_rate)
@abc.abstractmethod @abc.abstractmethod
......
...@@ -95,5 +95,6 @@ def user_define_engine(engine_yaml): ...@@ -95,5 +95,6 @@ def user_define_engine(engine_yaml):
train_dirname = os.path.dirname(train_location) train_dirname = os.path.dirname(train_location)
base_name = os.path.splitext(os.path.basename(train_location))[0] base_name = os.path.splitext(os.path.basename(train_location))[0]
sys.path.append(train_dirname) sys.path.append(train_dirname)
trainer_class = envs.lazy_instance_by_fliename(base_name, "UserDefineTraining") trainer_class = envs.lazy_instance_by_fliename(
base_name, "UserDefineTraining")
return trainer_class return trainer_class
...@@ -59,6 +59,8 @@ class SingleTrainer(TranspileTrainer): ...@@ -59,6 +59,8 @@ class SingleTrainer(TranspileTrainer):
def dataloader_train(self, context): def dataloader_train(self, context):
self._exe.run(fluid.default_startup_program()) self._exe.run(fluid.default_startup_program())
self.model.custom_preprocess()
reader = self._get_dataloader() reader = self._get_dataloader()
epochs = envs.get_global_env("train.epochs") epochs = envs.get_global_env("train.epochs")
...@@ -101,6 +103,8 @@ class SingleTrainer(TranspileTrainer): ...@@ -101,6 +103,8 @@ class SingleTrainer(TranspileTrainer):
def dataset_train(self, context): def dataset_train(self, context):
# run startup program at once # run startup program at once
self._exe.run(fluid.default_startup_program()) self._exe.run(fluid.default_startup_program())
self.model.custom_preprocess()
dataset = self._get_dataset() dataset = self._get_dataset()
epochs = envs.get_global_env("train.epochs") epochs = envs.get_global_env("train.epochs")
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import paddle.fluid as fluid
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
from fleetrec.core.trainers.single_trainer import SingleTrainer
from fleetrec.core.utils import envs
import numpy as np
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class TdmSingleTrainer(SingleTrainer):
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer.minimize((self.model.get_cost_op()))
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def startup(self, context):
namespace = "train.startup"
load_persistables = envs.get_global_env(
"single.load_persistables", False, namespace)
persistables_model_path = envs.get_global_env(
"single.persistables_model_path", "", namespace)
load_tree = envs.get_global_env(
"single.load_tree", False, namespace)
self.tree_layer_path = envs.get_global_env(
"single.tree_layer_path", "", namespace)
self.tree_travel_path = envs.get_global_env(
"single.tree_travel_path", "", namespace)
self.tree_info_path = envs.get_global_env(
"single.tree_info_path", "", namespace)
self.tree_emb_path = envs.get_global_env(
"single.tree_emb_path", "", namespace)
save_init_model = envs.get_global_env(
"single.save_init_model", False, namespace)
init_model_path = envs.get_global_env(
"single.init_model_path", "", namespace)
self._exe.run(fluid.default_startup_program())
if load_persistables:
# 从paddle二进制模型加载参数
fluid.io.load_persistables(
executor=self._exe,
dirname=persistables_model_path,
main_program=fluid.default_main_program())
logger.info("Load persistables from \"{}\"".format(
persistables_model_path))
if load_tree:
# 将明文树结构及数据,set到组网中的Variale中
# 不使用NumpyInitialize方法是考虑到树结构相关数据size过大,有性能风险
for param_name in Numpy_model:
param_t = fluid.global_scope().find_var(param_name).get_tensor()
param_array = self.tdm_prepare(param_name)
if param_name == 'TDM_Tree_Emb':
param_t.set(param_array.astype('float32'), place)
else:
param_t.set(param_array.astype('int32'), place)
if save_init_model:
logger.info("Begin Save Init model.")
fluid.io.save_persistables(
executor=self._exe, dirname=init_model_path)
logger.info("End Save Init model.")
context['status'] = 'train_pass'
def dataloader_train(self, context):
reader = self._get_dataloader()
epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram(
fluid.default_main_program()).with_data_parallel(
loss_name=self.model.get_cost_op().name)
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
for epoch in range(epochs):
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(
program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % 10 == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
context['status'] = 'infer_pass'
def dataset_train(self, context):
dataset = self._get_dataset()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
self.save(i, "train", is_fleet=False)
context['status'] = 'infer_pass'
def infer(self, context):
context['status'] = 'terminal_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
def tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self.tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self.tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self.tdm_info_prepare()
return info_array
elif param_name == "TDM_Tree_Emb":
emb_array = self.tdm_emb_prepare()
return emb_array
else:
raise " {} is not a special tdm param name".format(param_name)
def tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(
travel_array.shape[0]))
return travel_array
def tdm_emb_prepare(self):
"""load tdm tree param from npy/list file"""
emb_array = np.load(self.tree_emb_path)
logger.info("TDM Tree node nums from emb: {}".format(
emb_array.shape[0]))
return emb_array
def tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
...@@ -17,6 +17,7 @@ def engine_registry(): ...@@ -17,6 +17,7 @@ def engine_registry():
cpu["TRANSPILER"]["SINGLE"] = single_engine cpu["TRANSPILER"]["SINGLE"] = single_engine
cpu["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine cpu["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
cpu["TRANSPILER"]["CLUSTER"] = cluster_engine cpu["TRANSPILER"]["CLUSTER"] = cluster_engine
cpu["TRANSPILER"]["TDM_SINGLE"] = tdm_single_engine
cpu["PSLIB"]["SINGLE"] = local_mpi_engine cpu["PSLIB"]["SINGLE"] = local_mpi_engine
cpu["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine cpu["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
cpu["PSLIB"]["CLUSTER"] = cluster_mpi_engine cpu["PSLIB"]["CLUSTER"] = cluster_mpi_engine
...@@ -34,7 +35,8 @@ def get_engine(engine, device): ...@@ -34,7 +35,8 @@ def get_engine(engine, device):
run_engine = d_engine[transpiler].get(engine, None) run_engine = d_engine[transpiler].get(engine, None)
if run_engine is None: if run_engine is None:
raise ValueError("engine {} can not be supported on device: {}".format(engine, device)) raise ValueError(
"engine {} can not be supported on device: {}".format(engine, device))
return run_engine return run_engine
...@@ -92,6 +94,21 @@ def single_engine(args): ...@@ -92,6 +94,21 @@ def single_engine(args):
return trainer return trainer
def tdm_single_engine(args):
print("use tdm single engine to run model: {}".format(args.model))
single_envs = {}
single_envs["train.trainer.trainer"] = "TDMSingleTrainer"
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single"
single_envs["train.trainer.device"] = args.device
single_envs["train.trainer.platform"] = envs.get_platform()
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
def cluster_engine(args): def cluster_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model)) print("launch cluster engine with cluster to run model: {}".format(args.model))
...@@ -184,8 +201,10 @@ def get_abs_model(model): ...@@ -184,8 +201,10 @@ def get_abs_model(model):
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description='fleet-rec run') parser = argparse.ArgumentParser(description='fleet-rec run')
parser.add_argument("-m", "--model", type=str) parser.add_argument("-m", "--model", type=str)
parser.add_argument("-e", "--engine", type=str, choices=["single", "local_cluster", "cluster"]) parser.add_argument("-e", "--engine", type=str,
parser.add_argument("-d", "--device", type=str, choices=["cpu", "gpu"], default="cpu") choices=["single", "local_cluster", "cluster"])
parser.add_argument("-d", "--device", type=str,
choices=["cpu", "gpu"], default="cpu")
abs_dir = os.path.dirname(os.path.abspath(__file__)) abs_dir = os.path.dirname(os.path.abspath(__file__))
envs.set_runtime_environs({"PACKAGE_BASE": abs_dir}) envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
train:
trainer:
# for cluster training
strategy: "async"
epochs: 10
workspace: "fleetrec.models.recall.tdm"
reader:
batch_size: 32
class: "{workspace}/tdm_reader.py"
train_data_path: "{workspace}/data/train_data"
test_data_path: "{workspace}/data/test_data"
model:
models: "{workspace}/model.py"
hyper_parameters:
node_emb_size: 64
input_emb_size: 64
neg_sampling_list: [1, 2, 3, 4]
output_positive: True
topK: 1
learning_rate: 0.0001
act: "tanh"
optimizer: ADAM
tree_parameters:
max_layers: 4
node_nums: 26
leaf_node_nums: 13
layer_node_num_list: [2, 4, 7, 12]
child_nums: 2
startup:
single:
# 建议tree只load一次,保存为paddle tensor,之后从paddle模型热启
load_persistables: False
persistables_model_path: ""
load_tree: True
tree_layer_path: ""
tree_travel_path: ""
tree_info_path: ""
tree_emb_path: ""
save_init_model: True
init_model_path: ""
cluster:
load_persistables: True
persistables_model_path: ""
save:
increment:
dirname: "increment"
epoch_interval: 2
save_last: True
inference:
dirname: "inference"
epoch_interval: 4
save_last: True
\ No newline at end of file
此差异已折叠。
# -*- coding=utf8 -*-
"""
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import print_function
from fleetrec.core.reader import Reader
from fleetrec.core.utils import envs
class TrainReader(reader):
def reader(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def iterator():
"""
This function needs to be implemented by the user, based on data format
"""
features = (line.strip('\n')).split('\t')
input_emb = features[0].split(' ')
item_label = [features[1]]
feature_name = ["input_emb", "item_label"]
yield zip(feature_name, [input_emb] + [item_label])
return Reader
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册