提交 c8888222 编写于 作者: L liuyuhui

Merge branch 'master' of https://github.com/PaddlePaddle/PaddleRec into...

Merge branch 'master' of https://github.com/PaddlePaddle/PaddleRec into fix_collective_files_partition
......@@ -20,7 +20,7 @@ python -m paddlerec.run -m paddlerec.models.xxx.yyy
例如启动`recall`下的`word2vec`模型的默认配置;
```shell
python -m paddlerec.run -m models/recall/word2vec
python -m paddlerec.run -m models/recall/word2vec/config.yaml
```
### 2. 启动内置模型的个性化配置训练
......
......@@ -259,3 +259,133 @@ auc_var, batch_auc_var, auc_states = fluid.layers.auc(
```
完成上述组网后,我们最终可以通过训练拿到`avg_cost``auc`两个重要指标。
## 流式训练(OnlineLearning)任务启动及配置流程
### 流式训练简介
流式训练是按照一定顺序进行数据的接收和处理,每接收一个数据,模型会对它进行预测并对当前模型进行更新,然后处理下一个数据。 像信息流、小视频、电商等场景,每天都会新增大量的数据, 让每天(每一刻)新增的数据基于上一天(上一刻)的模型进行新的预测和模型更新。
在大规模流式训练场景下, 需要使用的深度学习框架有对应的能力支持, 即:
* 支持大规模分布式训练的能力, 数据量巨大, 需要有良好的分布式训练及扩展能力,才能满足训练的时效要求
* 支持超大规模的Embedding, 能够支持十亿甚至千亿级别的Embedding, 拥有合理的参数输出的能力,能够快速输出模型参数并和线上其他系统进行对接
* Embedding的特征ID需要支持HASH映射,不要求ID的编码,能够自动增长及控制特征的准入(原先不存在的特征可以以适当的条件创建), 能够定期淘汰(能够以一定的策略进行过期的特征的清理) 并拥有准入及淘汰策略
* 最后就是要基于框架开发一套完备的流式训练的 trainer.py, 能够拥有完善的流式训练流程
### 使用ctr-dnn online learning 进行模型的训练
目前,PaddleRec基于飞桨分布式训练框架的能力,实现了这套流式训练的流程。 供大家参考和使用。我们基于`models/rank/ctr-dnn`修改了一个online_training的版本,供大家更好的理解和参考。
**注意**
1. 使用online learning 需要安装目前Paddle最新的开发者版本, 你可以从 https://www.paddlepaddle.org.cn/documentation/docs/zh/install/Tables.html#whl-dev 此处获得它,需要先卸载当前已经安装的飞桨版本,根据自己的Python环境下载相应的安装包。
2. 使用online learning 需要安装目前PaddleRec最新的开发者版本, 你可以通过 git clone https://github.com/PaddlePaddle/PaddleRec.git 得到最新版的PaddleRec并自行安装
### 启动方法
1. 修改config.yaml中的 hyper_parameters.distributed_embedding=1,表示打开大规模稀疏的模式
2. 修改config.yaml中的 mode: [single_cpu_train, single_cpu_infer] 中的 `single_cpu_train` 为online_learning_cluster,表示使用online learning对应的运行模式
3. 准备训练数据, ctr-dnn中使用的online learning对应的训练模式为 天级别训练, 每天又分为24个小时, 因此训练数据需要 天--小时的目录结构进行整理。
以 2020年08月10日 到 2020年08月11日 2天的训练数据举例, 用户需要准备的数据的目录结构如下:
```
train_data/
|-- 20200810
| |-- 00
| | `-- train.txt
| |-- 01
| | `-- train.txt
| |-- 02
| | `-- train.txt
| |-- 03
| | `-- train.txt
| |-- 04
| | `-- train.txt
| |-- 05
| | `-- train.txt
| |-- 06
| | `-- train.txt
| |-- 07
| | `-- train.txt
| |-- 08
| | `-- train.txt
| |-- 09
| | `-- train.txt
| |-- 10
| | `-- train.txt
| |-- 11
| | `-- train.txt
| |-- 12
| | `-- train.txt
| |-- 13
| | `-- train.txt
| |-- 14
| | `-- train.txt
| |-- 15
| | `-- train.txt
| |-- 16
| | `-- train.txt
| |-- 17
| | `-- train.txt
| |-- 18
| | `-- train.txt
| |-- 19
| | `-- train.txt
| |-- 20
| | `-- train.txt
| |-- 21
| | `-- train.txt
| |-- 22
| | `-- train.txt
| `-- 23
| `-- train.txt
`-- 20200811
|-- 00
| `-- train.txt
|-- 01
| `-- train.txt
|-- 02
| `-- train.txt
|-- 03
| `-- train.txt
|-- 04
| `-- train.txt
|-- 05
| `-- train.txt
|-- 06
| `-- train.txt
|-- 07
| `-- train.txt
|-- 08
| `-- train.txt
|-- 09
| `-- train.txt
|-- 10
| `-- train.txt
|-- 11
| `-- train.txt
|-- 12
| `-- train.txt
|-- 13
| `-- train.txt
|-- 14
| `-- train.txt
|-- 15
| `-- train.txt
|-- 16
| `-- train.txt
|-- 17
| `-- train.txt
|-- 18
| `-- train.txt
|-- 19
| `-- train.txt
|-- 20
| `-- train.txt
|-- 21
| `-- train.txt
|-- 22
| `-- train.txt
`-- 23
`-- train.txt
```
4. 准备好数据后, 即可按照标准的训练流程进行流式训练了
```shell
python -m paddlerec.run -m models/rerank/ctr-dnn/config.yaml
```
......@@ -49,6 +49,7 @@ hyper_parameters:
sparse_feature_dim: 9
dense_input_dim: 13
fc_sizes: [512, 256, 128, 32]
distributed_embedding: 0
# select runner by name
mode: [single_cpu_train, single_cpu_infer]
......@@ -90,6 +91,18 @@ runner:
print_interval: 1
phases: [phase1]
- name: online_learning_cluster
class: cluster_train
runner_class_path: "{workspace}/online_learning_runner.py"
epochs: 2
device: cpu
fleet_mode: ps
save_checkpoint_interval: 1 # save model interval of epochs
save_checkpoint_path: "increment_dnn" # save checkpoint path
init_model_path: "" # load model path
print_interval: 1
phases: [phase1]
- name: collective_cluster
class: cluster_train
epochs: 2
......
......@@ -25,8 +25,16 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.is_distributed = False
self.distributed_embedding = False
if envs.get_fleet_mode().upper() == "PSLIB":
self.is_distributed = True
if envs.get_global_env("hyper_parameters.distributed_embedding",
0) == 1:
self.distributed_embedding = True
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
......@@ -40,14 +48,26 @@ class Model(ModelBase):
self.label_input = self._sparse_data_var[0]
def embedding_layer(input):
emb = fluid.layers.embedding(
input=input,
is_sparse=True,
is_distributed=self.is_distributed,
size=[self.sparse_feature_number, self.sparse_feature_dim],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()), )
if self.distributed_embedding:
emb = fluid.contrib.layers.sparse_embedding(
input=input,
size=[
self.sparse_feature_number, self.sparse_feature_dim
],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
else:
emb = fluid.layers.embedding(
input=input,
is_sparse=True,
is_distributed=self.is_distributed,
size=[
self.sparse_feature_number, self.sparse_feature_dim
],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
emb_sum = fluid.layers.sequence_pool(input=emb, pool_type='sum')
return emb_sum
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import time
import warnings
import numpy as np
import logging
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.metric import Metric
from paddlerec.core.trainers.framework.runner import RunnerBase
logging.basicConfig(
format='%(asctime)s - %(levelname)s: %(message)s', level=logging.INFO)
class OnlineLearningRunner(RunnerBase):
def __init__(self, context):
print("Running OnlineLearningRunner.")
def run(self, context):
epochs = int(
envs.get_global_env("runner." + context["runner_name"] +
".epochs"))
model_dict = context["env"]["phase"][0]
model_class = context["model"][model_dict["name"]]["model"]
metrics = model_class._metrics
dataset_list = []
dataset_index = 0
for day_index in range(len(days)):
day = days[day_index]
cur_path = "%s/%s" % (path, str(day))
filelist = fleet.split_files(hdfs_ls([cur_path]))
dataset = create_dataset(use_var, filelist)
dataset_list.append(dataset)
dataset_index += 1
dataset_index = 0
for epoch in range(len(days)):
day = days[day_index]
begin_time = time.time()
result = self._run(context, model_dict)
end_time = time.time()
seconds = end_time - begin_time
message = "epoch {} done, use time: {}".format(epoch, seconds)
# TODO, wait for PaddleCloudRoleMaker supports gloo
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
if context["fleet"] is not None and isinstance(context["fleet"],
GeneralRoleMaker):
metrics_result = []
for key in metrics:
if isinstance(metrics[key], Metric):
_str = metrics[key].calc_global_metrics(
context["fleet"],
context["model"][model_dict["name"]]["scope"])
metrics_result.append(_str)
elif result is not None:
_str = "{}={}".format(key, result[key])
metrics_result.append(_str)
if len(metrics_result) > 0:
message += ", global metrics: " + ", ".join(metrics_result)
print(message)
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
"main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
self.save(epoch, context, True)
context["status"] = "terminal_pass"
wget https://paddlerec.bj.bcebos.com/utils/tree_build_utils.tar.gz --no-check-certificate
# input_path: embedding的路径
# emb_shape: embedding中key-value,value的维度
# emb格式要求: embedding_id(int64),embedding(float),embedding(float),......,embedding(float)
# cluster_threads: 建树聚类所用线程
python_172_anytree/bin/python -u main.py --input_path=./gen_emb/item_emb.txt --output_path=./ --emb_shape=24 --cluster_threads=4
建树流程是:1、读取emb -> 2、kmeans聚类 -> 3、聚类结果整理为树 -> 4、基于树结构得到模型所需的4个文件
1 Layer_list:记录了每一层都有哪些节点。训练用
2 Travel_list:记录每个叶子节点的Travel路径。训练用
3 Tree_Info:记录了每个节点的信息,主要为:是否是item/item_id,所在层级,父节点,子节点。检索用
4 Tree_Embedding:记录所有节点的Embedding。训练及检索用
注意一下训练数据输入的item是建树之前用的item id,还是基于树的node id,还是基于叶子的leaf id,在tdm_reader.py中,可以加载字典,做映射。
用厂内版建树得到的输出文件夹里,有名为id2nodeid.txt的映射文件,格式是『hash值』+ 『树节点ID』+『叶子节点ID(表示第几个叶子节点,tdm_sampler op 所需的输入)』
在另一个id2bidword.txt中,也有映射关系,格式是『hash值』+『原始item ID』,这个文件中仅存储了叶子节点的信息。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册