提交 b4063f51 编写于 作者: W wangjiawei04

add inmemory dataset

上级 12fc8c82
......@@ -22,9 +22,9 @@ from paddlerec.core.utils import dataloader_instance
from paddlerec.core.reader import SlotReader
from paddlerec.core.trainer import EngineMode
from paddlerec.core.utils.util import split_files
from paddle.fluid.contrib.utils.hdfs_utils import HDFSClient
__all__ = ["DatasetBase", "DataLoader", "QueueDataset"]
__all__ = ["DatasetBase", "DataLoader", "QueueDataset", "InMemoryDataset"]
class DatasetBase(object):
"""R
......@@ -151,3 +151,68 @@ class QueueDataset(DatasetBase):
dataset.set_use_var(inputs)
break
return dataset
class InMemoryDataset(QueueDataset):
def _get_dataset(self, dataset_name, context):
with open("context.txt", "w+") as fout:
fout.write(str(context))
name = "dataset." + dataset_name + "."
reader_class = envs.get_global_env(name + "data_converter")
reader_class_name = envs.get_global_env(name + "reader_class_name",
"Reader")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
for dataset_config in context["env"]["dataset"]:
if dataset_config["type"] == "InMemoryDataset":
hdfs_addr = dataset_config["hdfs_addr"]
hdfs_ugi = dataset_config["hdfs_ugi"]
hadoop_home = dataset_config["hadoop_home"]
if hdfs_addr is None or hdfs_ugi is None:
raise ValueError("hdfs_addr and hdfs_ugi not set")
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
reader_class_name,
context["config_yaml"])
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", context["config_yaml"], "fake",
sparse_slots.replace(" ", "?"),
dense_slots.replace(" ", "?"), str(padding))
batch_size = envs.get_global_env(name + "batch_size")
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_batch_size(batch_size)
dataset.set_pipe_command(pipe_cmd)
dataset.set_hdfs_config(hdfs_addr, hdfs_ugi)
train_data_path = envs.get_global_env(name + "data_path")
hdfs_configs = {
"fs.default.name": hdfs_addr,
"hadoop.job.ugi": hdfs_ugi
}
hdfs_client = HDFSClient(hadoop_home, hdfs_configs)
file_list = ["{}/{}".format(hdfs_addr, x) for x in hdfs_client.lsr(train_data_path)]
if context["engine"] == EngineMode.LOCAL_CLUSTER:
file_list = split_files(file_list, context["fleet"].worker_index(),
context["fleet"].worker_num())
dataset.set_filelist(file_list)
for model_dict in context["phases"]:
if model_dict["dataset_name"] == dataset_name:
model = context["model"][model_dict["name"]]["model"]
thread_num = int(model_dict["thread_num"])
dataset.set_thread(thread_num)
if context["is_infer"]:
inputs = model._infer_data_var
else:
inputs = model._data_var
dataset.set_use_var(inputs)
dataset.load_into_memory()
break
return dataset
......@@ -19,7 +19,7 @@ import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset
from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset, InMemoryDataset
__all__ = [
"NetworkBase", "SingleNetwork", "PSNetwork", "PslibNetwork",
......@@ -105,7 +105,11 @@ class SingleNetwork(NetworkBase):
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
context)
elif type == "InMemoryDataset":
dataset_class = InMemoryDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
context)
context["status"] = "startup_pass"
......@@ -187,7 +191,11 @@ class FineTuningNetwork(NetworkBase):
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
context)
elif type == "InMemoryDataset":
dataset_class = InMemoryDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
context)
context["status"] = "startup_pass"
......@@ -250,6 +258,11 @@ class PSNetwork(NetworkBase):
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
dataset["name"], context)
elif type == "InMemoryDataset":
dataset_class = InMemoryDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
dataset["name"], context)
context["status"] = "startup_pass"
def _build_strategy(self, context):
......@@ -348,6 +361,11 @@ class PslibNetwork(NetworkBase):
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
dataset["name"], context)
elif type == "InMemoryDataset":
dataset_class = InMemoryDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
dataset["name"], context)
context["status"] = "startup_pass"
def _server(self, context):
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# workspace
workspace: "models/rank/dnn"
# list of dataset
dataset:
- name: dataset_train # name of dataset to distinguish different datasets
batch_size: 2
type: InMemoryDataset # or DataLoader
data_path: "/user/paddle/wangjiawei04/paddlerec/dnn"
hdfs_addr: "afs://yinglong.afs.baidu.com:9902"
hdfs_ugi: "paddle,paddle"
hadoop_home: "~/.ndt/software/hadoop-xingtian/hadoop/"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
# hyper parameters of user-defined network
hyper_parameters:
# optimizer config
optimizer:
class: Adam
learning_rate: 0.001
strategy: async
# user-defined <key, value> pairs
sparse_inputs_slots: 27
sparse_feature_number: 1000001
sparse_feature_dim: 9
dense_input_dim: 13
fc_sizes: [512, 256, 128, 32]
# select runner by name
mode: [single_cpu_train]
# config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: single_cpu_train
class: train
# num of epochs
epochs: 4
# device to run training or infer
device: cpu
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment_dnn" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
print_interval: 10
phases: [phase1]
# runner will run all the phase in each epoch
phase:
- name: phase1
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_train # select dataset by name
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册