提交 28ed5927 编写于 作者: X xiexionghang

commit kagle for paddle

上级 dcbf07c0
import copy
import yaml
import time
import datetime
import kagle_fs
import kagle_util
import kagle_layer
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
class Dataset(object):
__metaclass__=ABCMeta
def __init__(self, config):
self._datasets = {}
self._config = config
@abstractmethod
def check_ready(self, params):
pass
@abstractmethod
def load_dataset(self, params):
pass
@abstractmethod
def preload_dataset(self, params):
pass
@abstractmethod
def release_dataset(self, params):
pass
class TimeSplitDataset(Dataset):
def __init__(self, config):
Dataset.__init__(self, config)
if 'data_donefile' not in config or config['data_donefile'] is None:
config['data_donefile'] = config['data_path'] + "/to.hadoop.done"
self._path_generator = kagle_util.PathGenerator({'templates' : [
{'name': 'data_path', 'template': config['data_path']},
{'name': 'donefile_path', 'template': config['data_donefile']}
]})
self._split_interval = config['split_interval'] # data split N mins per dir
self._data_file_handler = kagle_fs.FileHandler(config)
def _format_data_time(self, daytime_str, time_window_mins):
data_time = kagle_util.make_datetime(daytime_str)
mins_of_day = data_time.hour * 60 + data_time.minute
begin_stage = mins_of_day / self._split_interval
end_stage = (mins_of_day + time_window_mins) / self._split_interval
if begin_stage == end_stage and mins_of_day % self._split_interval != 0:
return None, 0
if mins_of_day % self._split_interval != 0:
skip_mins = self._split_interval - (mins_of_day % self._split_interval)
data_time = data_time + datetime.timedelta(minutes=skip_mins)
time_window_mins = time_window_mins - skip_mins
return data_time,time_window_mins
def check_ready(self, daytime_str, time_window_mins):
is_ready = True
data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0:
file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time})
if not self._data_file_handler.is_exist(file_path):
is_ready = False
break
time_window_mins = time_window_mins - self._split_interval
data_time = data_time + datetime.timedelta(minutes=self._split_interval)
return is_ready
def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0):
data_file_list = []
data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0:
file_path = self._path_generator.generate_path('data_path', {'time_format': data_time})
sub_file_list = self._data_file_handler.ls(file_path)
for sub_file in sub_file_list:
sub_file_name = self._data_file_handler.get_file_name(sub_file)
if not sub_file_name.startswith(self._config['filename_prefix']):
continue
if hash(sub_file_name) % node_num == node_idx:
data_file_list.append(sub_file)
time_window_mins = time_window_mins - self._split_interval
data_time = data_time + datetime.timedelta(minutes=self._split_interval)
return data_file_list
class FluidTimeSplitDataset(TimeSplitDataset):
def __init__(self, config):
TimeSplitDataset.__init__(self, config)
def _alloc_dataset(self, file_list):
dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type'])
dataset.set_batch_size(self._config['batch_size'])
dataset.set_thread(self._config['load_thread'])
dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi'])
dataset.set_pipe_command(self._config['data_converter'])
dataset.set_filelist(file_list)
dataset.set_use_var(self._config['data_vars'])
#dataset.set_fleet_send_sleep_seconds(2)
#dataset.set_fleet_send_batch_size(80000)
return dataset
def load_dataset(self, params):
begin_time = params['begin_time']
windown_min = params['time_window_min']
if begin_time not in self._datasets:
while self.check_ready(begin_time, windown_min) == False:
print("dataset not ready, time:" + begin_time)
time.sleep(30)
file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx'])
self._datasets[begin_time] = self._alloc_dataset(file_list)
self._datasets[begin_time].load_into_memory()
else:
self._datasets[begin_time].wait_preload_done()
return self._datasets[begin_time]
def preload_dataset(self, params):
begin_time = params['begin_time']
windown_min = params['time_window_min']
if begin_time not in self._datasets:
if self.check_ready(begin_time, windown_min):
file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx'])
self._datasets[begin_time] = self._alloc_dataset(file_list)
self._datasets[begin_time].preload_into_memory(self._config['preload_thread'])
return True
return False
def release_dataset(self, params):
begin_time = params['begin_time']
windown_min = params['time_window_min']
if begin_time in self._datasets:
self._datasets[begin_time].release_memory()
import os
import time
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
def is_afs_path(path):
if path.startswith("afs") or path.startswith("hdfs"):
return True
return False
class LocalFSClient:
def __init__(self):
pass
def write(self, content, path, mode):
temp_dir = os.path.dirname(path)
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
f = open(path, mode)
f.write(content)
f.flush()
f.close()
def cp(self, org_path, dest_path):
temp_dir = os.path.dirname(dest_path)
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
return os.system("cp -r " + org_path + " " + dest_path)
def cat(self, file_path):
f = open(file_path)
content = f.read()
f.close()
return content
def mkdir(self, dir_name):
os.system("mkdir -p " + path)
def remove(self, path):
os.system("rm -rf " + path)
def is_exist(self, path):
if os.system("ls " + path) == 0:
return True
return False
def ls(self, path):
files = os.listdir(path)
files = [ path + '/' + fi for fi in files ]
return files
class FileHandler:
def __init__(self, config):
if 'fs_name' in config:
hadoop_home="$HADOOP_HOME"
hdfs_configs = {
"hadoop.job.ugi": config['fs_ugi'],
"fs.default.name": config['fs_name']
}
self._hdfs_client = HDFSClient(hadoop_home, hdfs_configs)
self._local_fs_client = LocalFSClient()
def is_exist(self, path):
if is_afs_path(path):
return self._hdfs_client.is_exist(path)
else:
return self._local_fs_client.is_exist(path)
def get_file_name(self, path):
sub_paths = path.split('/')
return sub_paths[-1]
def write(self, content, dest_path, mode='w'):
if is_afs_path(dest_path):
file_name = self.get_file_name(dest_path)
temp_local_file = "./tmp/" + file_name
self._local_fs_client.remove(temp_local_file)
org_content = ""
if mode.find('a') >= 0:
org_content = self._hdfs_client.cat(dest_path)
content = content + org_content
self._local_fs_client.write(content, temp_local_file, mode) #fleet hdfs_client only support upload, so write tmp file
self._hdfs_client.delete(dest_path + ".tmp")
self._hdfs_client.upload(dest_path + ".tmp", temp_local_file)
self._hdfs_client.delete(dest_path + ".bak")
self._hdfs_client.rename(dest_path, dest_path + '.bak')
self._hdfs_client.rename(dest_path + ".tmp", dest_path)
else:
self._local_fs_client.write(content, dest_path, mode)
def cat(self, path):
if is_afs_path(path):
print("xxh go cat " + path)
hdfs_cat = self._hdfs_client.cat(path)
print(hdfs_cat)
return hdfs_cat
else:
return self._local_fs_client.cat(path)
def ls(self, path):
if is_afs_path(path):
return self._hdfs_client.ls(path)
else:
return self._local_fs_client.ls(path)
def cp(self, org_path, dest_path):
org_is_afs = is_afs_path(org_path)
dest_is_afs = is_afs_path(dest_path)
if not org_is_afs and not dest_is_afs:
return self._local_fs_client.cp(org_path, dest_path)
if not org_is_afs and dest_is_afs:
return self._hdfs_client.upload(dest_path, org_path)
if org_is_afs and not dest_is_afs:
return self._hdfs_client.download(org_path, dest_path)
print("Not Suppor hdfs cp currently")
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
class Layer(object):
__metaclass__=ABCMeta
def __init__(self, config):
pass
def generate(self, mode, param):
if mode == 'fluid':
return self.generate_fluid(param)
elif mode == 'tensorflow':
return self.generate_tensorflow(param)
print ('unsupport this mode: ' + mode)
return None,None
@abstractmethod
def generate_fluid(self, param):
pass
# maybe
#@abstractmethod
def generate_tensorflow(self, param):
pass
class EmbeddingInputLayer(Layer):
def __init__(self, config):
self._cvm = config['cvm']
self._name = config['name']
self._slots = [ str(slot) for slot in config['slots'] ]
self._mf_dim = config['mf_dim']
self._backward = config['backward']
self._emb_dim = self._mf_dim + 3 #append show ctr lr
self._emb_layers = []
def generate_fluid(self, param):
show_clk = fluid.layers.concat(
[param['layer']['show'], param['layer']['click']], axis=1)
show_clk.stop_gradient = True
data_var = []
for slot in self._slots:
l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1)
data_var.append(l)
emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], is_sparse = True, is_distributed=True, param_attr=fluid.ParamAttr(name="embedding"))
emb = fluid.layers.sequence_pool(input=emb, pool_type='sum')
emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm)
self._emb_layers.append(emb)
output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name)
return output, {'data_var' : data_var}
class LabelInputLayer(Layer):
def __init__(self, config):
self._name = config['name']
self._dim = config.get('dim', 1)
self._data_type = config.get('data_type', "int64")
self._label_idx = config['label_idx']
def generate_fluid(self, param):
label = fluid.layers.data(name=self._name, shape=[-1, self._dim], dtype=self._data_type, lod_level=0, append_batch_size=False)
cast_label = fluid.layers.cast(label, dtype='float32')
cast_label.stop_gradient = True
return cast_label, {'data_var' : [label]}
class TagInputLayer(Layer):
def __init__(self, config):
self._name = config['name']
self._tag = config['tag']
self._dim = config.get('dim', 1)
self._data_type = config['data_type']
def generate_fluid(self, param):
output = fluid.layers.data(name=self._name, shape=[-1, self._dim], dtype=self._data_type, lod_level=0, append_batch_size=False, stop_gradient=True)
return output, {'data_var' : [output]}
class ParamLayer(Layer):
def __init__(self, config):
self._name = config['name']
self._coln = config['coln']
self._table_id = config.get('table_id', -1)
self._init_range = config.get('init_range', 1)
self._data_type = config.get('data_type', 'float32')
self._config = config
def generate_fluid(self, param):
return self._config, {'inference_param': {'name':'param', 'params': [], 'table_id': self._table_id}}
class SummaryLayer(Layer):
def __init__(self, config):
self._name = config['name']
self._table_id = config.get('table_id', -1)
self._data_type = config.get('data_type', 'float32')
self._config = config
def generate_fluid(self, param):
return self._config, {'inference_param': {'name':'summary', 'params': [], 'table_id': self._table_id}}
class NormalizetionLayer(Layer):
def __init__(self, config):
self._name = config['name']
self._input = config['input']
self._summary = config['summary']
self._table_id = config.get('table_id', -1)
def generate_fluid(self, param):
input_layer = param['layer'][self._input[0]]
summary_layer = param['layer'][self._summary]
if len(self._input) > 0:
input_list=[ param['layer'][i] for i in self._input ]
input_layer = fluid.layers.concat(input=input_list, axis=1)
bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={
"batch_size":1e4,
"batch_sum_default":0.0,
"batch_square":1e4})
inference_param = [ self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum' ]
return bn, {'inference_param' : { 'name':'summary', 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}}
class NeuralLayer(Layer):
def __init__(self, config):
self._name = config['name']
self._param = config['param']
self._input = config['input']
self._bias = config.get('bias', True)
self._act_func = config.get('act_func', None)
def generate_fluid(self, param):
param_layer = param['layer'][self._param]
input_layer = param['layer'][self._input[0]]
if len(self._input) > 0:
input_list=[ param['layer'][i] for i in self._input ]
input_layer = fluid.layers.concat(input=input_list, axis=1)
input_coln = input_layer.shape[1]
scale = param_layer['init_range'] / (input_coln ** 0.5)
bias = None
if self._bias:
bias = fluid.ParamAttr(learning_rate=1.0, initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale))
fc = fluid.layers.fc(
name = self._name,
input = input_layer,
size = param_layer['coln'],
act = self._act_func,
param_attr = \
fluid.ParamAttr(learning_rate=1.0, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)),
bias_attr = bias)
inference_param = [self._name + '.w_0', self._name + '.b_0']
return fc, {'inference_param' : {'name':'param', 'params': inference_param, 'table_id': param_layer.get('table_id', -1)}}
class SigmoidLossLayer(Layer):
def __init__(self, config):
self._name = config['name']
self._label = config['label']
self._input = config['input']
self._weight = config.get('weight', None)
self._metric_label = config.get('metric_label', None)
self._bound = config.get('bound', [-15.0, 15.0])
self._extend_output = {
'metric_label' : self._metric_label,
'metric_dict' : {
'auc' : { 'var' : None},
'batch_auc' : {'var' : None},
'stat_pos' : {'var' : None, 'data_type' : 'int64'},
'stat_neg' : {'var' : None, 'data_type' : 'int64'},
'batch_stat_pos' : {'var' : None, 'data_type' : 'int64'},
'batch_stat_neg' : {'var' : None, 'data_type' : 'int64'},
'pos_ins_num' : {'var' : None},
'abserr': {'var' : None},
'sqrerr': {'var' : None},
'prob': {'var' : None},
'total_ins_num': {'var' : None},
'q': {'var' : None}
}
}
def generate_fluid(self, param):
input_layer = param['layer'][self._input[0]]
label_layer = param['layer'][self._label]
output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name = self._name)
norm = fluid.layers.sigmoid(output, name=self._name)
output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32'))
if self._weight:
weight_layer = param['layer'][self._weight]
output = fluid.layers.elementwise_mul(output, weight_layer)
output = fluid.layers.mean(x=output)
self._extend_output['loss'] = output
#For AUC Metric
metric = self._extend_output['metric_dict']
binary_predict = fluid.layers.concat(
input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1)
metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \
metric['batch_stat_neg']['var'], metric['stat_pos']['var'], metric['stat_neg']['var']] = \
fluid.layers.auc(input=binary_predict, label=fluid.layers.cast(x=label_layer, dtype='int64'), curve='ROC', num_thresholds=32)
metric['sqrerr']['var'], metric['abserr']['var'], metric['prob']['var'], metric['q']['var'], \
metric['pos_ins_num']['var'], metric['total_ins_num']['var'] = \
fluid.contrib.layers.ctr_metric_bundle(norm, fluid.layers.cast(x=label_layer, dtype='float32'))
return norm, self._extend_output
import math
import time
import numpy as np
import kagle_util
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
class Metric(object):
__metaclass__=ABCMeta
def __init__(self, config):
pass
@abstractmethod
def clear(self, scope, params):
pass
@abstractmethod
def calculate(self, scope, params):
pass
@abstractmethod
def get_result(self):
pass
@abstractmethod
def get_result_to_string(self):
pass
class PaddleAUCMetric(Metric):
def __init__(self, config):
pass
def clear(self, scope, params):
self._label = params['label']
self._metric_dict = params['metric_dict']
self._result = {}
place=fluid.CPUPlace()
for metric_name in self._metric_dict:
metric_config = self._metric_dict[metric_name]
if scope.find_var(metric_config['var'].name) is None:
continue
metric_var = scope.var(metric_config['var'].name).get_tensor()
data_type = 'float32'
if 'data_type' in metric_config:
data_type = metric_config['data_type']
data_array = np.zeros(metric_var._get_dims()).astype(data_type)
metric_var.set(data_array, place)
pass
def get_metric(self, scope, metric_name):
metric = np.array(scope.find_var(metric_name).get_tensor())
old_metric_shape = np.array(metric.shape)
metric = metric.reshape(-1)
global_metric = np.copy(metric) * 0
fleet._role_maker._node_type_comm.Allreduce(metric, global_metric)
global_metric = global_metric.reshape(old_metric_shape)
return global_metric[0]
def get_global_metrics(self, scope, metric_dict):
fleet._role_maker._barrier_worker()
result = {}
for metric_name in metric_dict:
metric_item = metric_dict[metric_name]
if scope.find_var(metric_item['var'].name) is None:
result[metric_name] = None
continue
result[metric_name] = self.get_metric(scope, metric_item['var'].name)
return result
def calculate_auc(self, global_pos, global_neg):
num_bucket = len(global_pos)
area = 0.0
pos = 0.0
neg = 0.0
new_pos = 0.0
new_neg = 0.0
total_ins_num = 0
for i in xrange(num_bucket):
index = num_bucket - 1 - i
new_pos = pos + global_pos[index]
total_ins_num += global_pos[index]
new_neg = neg + global_neg[index]
total_ins_num += global_neg[index]
area += (new_neg - neg) * (pos + new_pos) / 2
pos = new_pos
neg = new_neg
auc_value = None
if pos * neg == 0 or total_ins_num == 0:
auc_value = 0.5
else:
auc_value = area / (pos * neg)
return auc_value
def calculate_bucket_error(self, global_pos, global_neg):
num_bucket = len(global_pos)
last_ctr = -1.0
impression_sum = 0.0
ctr_sum = 0.0
click_sum = 0.0
error_sum = 0.0
error_count = 0.0
click = 0.0
show = 0.0
ctr = 0.0
adjust_ctr = 0.0
relative_error = 0.0
actual_ctr = 0.0
relative_ctr_error = 0.0
k_max_span = 0.01
k_relative_error_bound = 0.05
for i in xrange(num_bucket):
click = global_pos[i]
show = global_pos[i] + global_neg[i]
ctr = float(i) / num_bucket
if abs(ctr - last_ctr) > k_max_span:
last_ctr = ctr
impression_sum = 0.0
ctr_sum = 0.0
click_sum = 0.0
impression_sum += show
ctr_sum += ctr * show
click_sum += click
if impression_sum == 0:
continue
adjust_ctr = ctr_sum / impression_sum
if adjust_ctr == 0:
continue
relative_error = \
math.sqrt((1 - adjust_ctr) / (adjust_ctr * impression_sum))
if relative_error < k_relative_error_bound:
actual_ctr = click_sum / impression_sum
relative_ctr_error = abs(actual_ctr / adjust_ctr - 1)
error_sum += relative_ctr_error * impression_sum
error_count += impression_sum
last_ctr = -1
bucket_error = error_sum / error_count if error_count > 0 else 0.0
return bucket_error
def calculate(self, scope, params):
self._label = params['label']
self._metric_dict = params['metric_dict']
fleet._role_maker._barrier_worker()
result = self.get_global_metrics(scope, self._metric_dict)
if 'stat_pos' in result and 'stat_neg' in result:
result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
if 'pos_ins_num' in result:
result['actual_ctr'] = result['pos_ins_num'] / result['total_ins_num']
if 'abserr' in result:
result['mae'] = result['abserr'] / result['total_ins_num']
if 'sqrerr' in result:
result['rmse'] = math.sqrt(result['sqrerr'] / result['total_ins_num'])
if 'prob' in result:
result['predict_ctr'] = result['prob'] / result['total_ins_num']
if abs(result['predict_ctr']) > 1e-6:
result['copc'] = result['actual_ctr'] / result['predict_ctr']
if 'q' in result:
result['mean_q'] = result['q'] / result['total_ins_num']
self._result = result
return result
def get_result(self):
return self._result
def get_result_to_string(self):
result = self.get_result()
result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f "\
"Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \
(self._label, result['auc'], result['bucket_error'], result['mae'], result['rmse'], result['actual_ctr'],
result['predict_ctr'], result['copc'], result['mean_q'], result['total_ins_num'])
return result_str
import copy
import yaml
import kagle_layer
import kagle_table
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
def create(config):
model = None
if config['mode'] == 'fluid':
model = FluidModel(config)
model.build_model()
return model
class Model(object):
__metaclass__=ABCMeta
def __init__(self, config):
self._config = config
self._name = config['name']
f = open(config['layer_file'], 'r')
self._build_nodes = yaml.safe_load(f.read())
self._build_phase = ['input', 'param', 'summary', 'layer']
self._build_param = {'layer': {}, 'inner_layer':{}, 'layer_extend': {}, 'model': {}}
self._inference_meta = {'dependency':{}, 'params': {}}
self._cost = None
self._metrics = {}
self._data_var = []
pass
def get_cost_op(self):
return self._cost
def get_metrics(self):
return self._metrics
@abstractmethod
def shrink(self, params):
pass
@abstractmethod
def build_model(self):
pass
@abstractmethod
def dump_model_program(self, path):
pass
@abstractmethod
def dump_inference_param(self, params):
pass
@abstractmethod
def dump_inference_program(self, inference_layer, path):
pass
def inference_params(self, inference_layer):
layer = inference_layer
if layer in self._inference_meta['params']:
return self._inference_meta['params'][layer]
self._inference_meta['params'][layer] = []
self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer)
for node in self._build_nodes['layer']:
if node['name'] not in self._inference_meta['dependency'][layer]:
continue
if 'inference_param' in self._build_param['layer_extend'][node['name']]:
self._inference_meta['params'][layer] += self._build_param['layer_extend'][node['name']]['inference_param']['params']
return self._inference_meta['params'][layer]
def get_dependency(self, layer_graph, dest_layer):
dependency_list = []
if dest_layer in layer_graph:
dependencys = copy.deepcopy(layer_graph[dest_layer]['input'])
dependency_list = copy.deepcopy(dependencys)
for dependency in dependencys:
dependency_list = dependency_list + self.get_dependency(layer_graph, dependency)
return list(set(dependency_list))
class FluidModel(Model):
def __init__(self, config):
Model.__init__(self, config)
pass
def build_model(self):
for layer in self._build_nodes['layer']:
self._build_param['inner_layer'][layer['name']] = layer
self._build_param['table'] = {}
self._build_param['model']['train_program'] = fluid.Program()
self._build_param['model']['startup_program'] = fluid.Program()
with fluid.program_guard(self._build_param['model']['train_program'], self._build_param['model']['startup_program']):
with fluid.unique_name.guard():
for phase in self._build_phase:
if self._build_nodes[phase] is None:
continue
for node in self._build_nodes[phase]:
exec("""layer=kagle_layer.{}(node)""".format(node['class']))
layer_output, extend_output = layer.generate(self._config['mode'], self._build_param)
self._build_param['layer'][node['name']] = layer_output
self._build_param['layer_extend'][node['name']] = extend_output
if extend_output is None:
continue
if 'loss' in extend_output:
if self._cost is None:
self._cost = extend_output['loss']
else:
self._cost += extend_output['loss']
if 'data_var' in extend_output:
self._data_var += extend_output['data_var']
if 'metric_label' in extend_output and extend_output['metric_label'] is not None:
self._metrics[extend_output['metric_label']] = extend_output['metric_dict']
if 'inference_param' in extend_output:
param_name = extend_output['inference_param']['name']
if param_name not in self._build_param['table']:
self._build_param['table'][param_name] = {'params':[]}
table_meta = kagle_table.TableMeta.alloc_new_table(extend_output['inference_param']['table_id'])
self._build_param['table'][param_name]['_meta'] = table_meta
self._build_param['table'][param_name]['params'] += extend_output['inference_param']['params']
pass
@classmethod
def build_optimizer(self, params):
optimizer_conf = params['optimizer_conf']
strategy = None
if 'strategy' in optimizer_conf:
strategy = optimizer_conf['strategy']
stat_var_names = []
metrics = params['metrics']
for name in metrics:
model_metrics = metrics[name]
stat_var_names += [ model_metrics[metric]['var'].name for metric in model_metrics]
strategy['stat_var_names'] = list(set(stat_var_names))
optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')'
exec(optimizer_generator)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
return optimizer
def dump_model_program(self, path):
with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout:
print >> fout, self._build_param['model']['train_program']
with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout:
print >> fout, self._build_param['model']['startup_program']
pass
def shrink(self, params):
scope = params['scope']
decay = params['decay']
for param_table in self._build_param['table']:
table_id = self._build_param['table'][param_table]['_meta']._table_id
fleet.shrink_dense_table(decay, scope=scope, table_id=table_id)
def dump_inference_program(self, inference_layer, path):
pass
def dump_inference_param(self, params):
scope = params['scope']
executor = params['executor']
program = self._build_param['model']['train_program']
for table_name,table in self._build_param['table'].items():
fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params'])
for infernce_item in params['inference_list']:
params_name_list = self.inference_params(infernce_item['layer_name'])
params_var_list = [ program.global_block().var(i) for i in params_name_list ]
params_file_name = infernce_item['save_file_name']
with fluid.scope_guard(scope):
if params['save_combine']:
fluid.io.save_vars(
executor, "./", program, vars=params_var_list, filename=params_file_name)
else:
fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list)
pass
import copy
import yaml
from abc import ABCMeta, abstractmethod
class TableMeta:
TableId = 1
@staticmethod
def alloc_new_table(table_id):
if table_id < 0:
table_id = TableMeta.TableId
if table_id >= TableMeta.TableId:
TableMeta.TableId += 1
table = TableMeta(table_id)
return table
def __init__(self, table_id):
self._table_id = table_id
pass
import sys
import copy
import yaml
import time
import json
import datetime
import kagle_fs
import kagle_util
import kagle_model
import kagle_dataset
import kagle_metric
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
class Trainer(object):
__metaclass__=ABCMeta
def __init__(self, config):
self._status_processor = {}
self._context = {'status': 'uninit', 'is_exit': False}
def regist_context_processor(self, status_name, processor):
self._status_processor[status_name] = processor
def context_process(self, context):
if context['status'] in self._status_processor:
self._status_processor[context['status']](context)
else:
self.other_status_processor(context)
def other_status_processor(self, context):
print('unknow context_status:%s, do nothing' % context['status'])
time.sleep(60)
def reload_train_context(self):
pass
def run(self):
while True:
self.reload_train_context()
self.context_process(self._context)
if self._context['is_exit']:
break
class AbacusPaddleTrainer(Trainer):
def __init__(self, config):
Trainer.__init__(self, config)
config['output_path'] = kagle_util.get_absolute_path(
config['output_path'], config['io']['afs'])
self.global_config = config
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self._exector_context = {}
self._metrics = {}
self._path_generator = kagle_util.PathGenerator({
'templates' : [
{'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'},
{'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'},
{'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'},
{'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'},
{'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'}
]
})
if 'path_generator' in config:
self._path_generator.add_path_template(config['path_generator'])
self.regist_context_processor('uninit', self.init)
self.regist_context_processor('startup', self.startup)
self.regist_context_processor('begin_day', self.begin_day)
self.regist_context_processor('train_pass', self.train_pass)
self.regist_context_processor('end_day', self.end_day)
def init(self, context):
fleet.init(self._exe)
data_var_list = []
data_var_name_dict = {}
runnnable_scope = []
runnnable_cost_op = []
context['status'] = 'startup'
for executor in self.global_config['executor']:
scope = fluid.Scope()
self._exector_context[executor['name']] = {}
self._exector_context[executor['name']]['scope'] = scope
self._exector_context[executor['name']]['model'] = kagle_model.create(executor)
model = self._exector_context[executor['name']]['model']
self._metrics.update(model.get_metrics())
runnnable_scope.append(scope)
runnnable_cost_op.append(model.get_cost_op())
for var in model._data_var:
if var.name in data_var_name_dict:
continue
data_var_list.append(var)
data_var_name_dict[var.name] = var
optimizer = kagle_model.FluidModel.build_optimizer({
'metrics' : self._metrics,
'optimizer_conf' : self.global_config['optimizer']
})
optimizer.minimize(runnnable_cost_op, runnnable_scope)
for executor in self.global_config['executor']:
scope = self._exector_context[executor['name']]['scope']
model = self._exector_context[executor['name']]['model']
program = model._build_param['model']['train_program']
if not executor['is_update_sparse']:
program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = []
if 'train_thread_num' not in executor:
executor['train_thread_num'] = global_config['train_thread_num']
with fluid.scope_guard(scope):
self._exe.run(model._build_param['model']['startup_program'])
model.dump_model_program('./')
#server init done
if fleet.is_server():
return 0
self._dataset = {}
for dataset_item in self.global_config['dataset']['data_list']:
dataset_item['data_vars'] = data_var_list
dataset_item.update(self.global_config['io']['afs'])
dataset_item["batch_size"] = self.global_config['batch_size']
self._dataset[dataset_item['name']] = kagle_dataset.FluidTimeSplitDataset(dataset_item)
#if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
# util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
fleet.init_worker()
pass
def print_log(self, log_str, params):
params['index'] = fleet.worker_index()
return kagle_util.print_log(log_str, params)
def print_global_metrics(self, scope, model, monitor_data, stdout_str):
metrics = model.get_metrics()
metric_calculator = kagle_metric.PaddleAUCMetric(None)
for metric in metrics:
metric_param = {'label' : metric, 'metric_dict' : metrics[metric]}
metric_calculator.calculate(scope, metric_param)
metric_result = metric_calculator.get_result_to_string()
self.print_log(metric_result, {'master': True, 'stdout' : stdout_str})
monitor_data += metric_result
metric_calculator.clear(scope, metric_param)
def save_model(self, day, pass_index, base_key):
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'save model cost %s sec'})
model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index})
save_mode = 0 # just save all
if pass_index < 1: #batch_model
save_mode = 3 # unseen_day++, save all
kagle_util.rank0_print("going to save_model %s" % model_path)
fleet.save_persistables(None, model_path, mode=save_mode)
self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True)
cost_printer.done()
return model_path
def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data):
stdout_str = ""
xbox_patch_id = str(int(time.time()))
kagle_util.rank0_print("begin save delta model")
model_path = ""
xbox_model_donefile = ""
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'save xbox model cost %s sec', 'stdout' : stdout_str})
if pass_index < 1:
save_mode = 2
xbox_patch_id = xbox_base_key
model_path = self._path_generator.generate_path('xbox_base', {'day' : day})
xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day' : day})
else:
save_mode = 1
model_path = self._path_generator.generate_path('xbox_delta', {'day' : day, 'pass_id':pass_index})
xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day' : day})
total_save_num = fleet.save_persistables(None, model_path, mode=save_mode)
cost_printer.done()
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True,
'log_format' : 'save cache model cost %s sec', 'stdout' : stdout_str})
model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs'])
if self.global_config['save_cache_model']:
cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode)
model_file_handler.write(
"file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num,
model_path + '/000_cache/sparse_cache.meta', 'w')
cost_printer.done()
kagle_util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num)
save_env_param = {
'executor': self._exe,
'save_combine': True
}
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True,
'log_format' : 'save dense model cost %s sec', 'stdout' : stdout_str})
for executor in self.global_config['executor']:
if 'layer_for_inference' not in executor:
continue
executor_name = executor['name']
model = self._exector_context[executor_name]['model']
save_env_param['inference_list'] = executor['layer_for_inference']
save_env_param['scope'] = self._exector_context[executor_name]['scope']
model.dump_inference_param(save_env_param)
for dnn_layer in executor['layer_for_inference']:
model_file_handler.cp(dnn_layer['save_file_name'],
model_path + '/dnn_plugin/' + dnn_layer['save_file_name'])
cost_printer.done()
xbox_done_info = {
"id" : xbox_patch_id,
"key" : xbox_base_key,
"ins_path" : "",
"ins_tag" : "feasign",
"partition_type" : "2",
"record_count" : "111111",
"monitor_data" : monitor_data,
"mpi_size" : str(fleet.worker_num()),
"input" : model_path.rstrip("/") + "/000",
"job_id" : kagle_util.get_env_value("JOB_ID"),
"job_name" : kagle_util.get_env_value("JOB_NAME")
}
model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
if pass_index > 0:
self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False)
return stdout_str
def run_executor(self, executor_config, dataset, stdout_str):
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
xbox_base_key = self._train_pass._base_key
executor_name = executor_config['name']
scope = self._exector_context[executor_name]['scope']
model = self._exector_context[executor_name]['model']
with fluid.scope_guard(scope):
kagle_util.rank0_print("Begin " + executor_name + " pass")
begin = time.time()
program = model._build_param['model']['train_program']
self._exe.train_from_dataset(program, dataset, scope,
thread=executor_config['train_thread_num'], debug=self.global_config['debug'])
end = time.time()
local_cost = (end-begin) / 60.0
avg_cost = kagle_util.worker_numric_avg(local_cost)
min_cost = kagle_util.worker_numric_min(local_cost)
max_cost = kagle_util.worker_numric_max(local_cost)
kagle_util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost))
self._exector_context[executor_name]['cost'] = max_cost
monitor_data = ""
self.print_global_metrics(scope, model, monitor_data, stdout_str)
kagle_util.rank0_print("End " + executor_name + " pass")
if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']:
stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data)
def startup(self, context):
if fleet.is_server():
fleet.run_server()
context['status'] = 'wait'
return
stdout_str = ""
self._train_pass = kagle_util.TimeTrainPass(self.global_config)
if not self.global_config['cold_start']:
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'load model cost %s sec', 'stdout' : stdout_str})
self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True})
#if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date()
# and config.reqi_dnn_plugin_pass >= self._pass_id:
# fleet.load_one_table(0, self._train_pass._checkpoint_model_path)
#else:
fleet.init_server(self._train_pass._checkpoint_model_path, mode=0)
cost_printer.done()
if self.global_config['save_first_base']:
self.print_log("save_first_base=True", {'master': True})
self.print_log("going to save xbox base model", {'master': True, 'stdout' : stdout_str})
self._train_pass._base_key = int(time.time())
stdout_str += self.save_xbox_model(day, 0, self._train_pass._base_key, "")
context['status'] = 'begin_day'
def begin_day(self, context):
stdout_str = ""
if not self._train_pass.next():
context['is_exit'] = True
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout' : stdout_str})
if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day'
else:
context['status'] = 'train_pass'
def end_day(self, context):
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
xbox_base_key = int(time.time())
context['status'] = 'begin_day'
kagle_util.rank0_print("shrink table")
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'shrink table done, cost %s sec'})
fleet.shrink_sparse_table()
for executor in self._exector_context:
self._exector_context[executor]['model'].shrink({
'scope': self._exector_context[executor]['scope'],
'decay': self.global_config['optimizer']['dense_decay_rate']
})
cost_printer.done()
next_date = self._train_pass.date(delta_day=1)
kagle_util.rank0_print("going to save xbox base model")
self.save_xbox_model(next_date, 0, xbox_base_key, "")
kagle_util.rank0_print("going to save batch model")
self.save_model(next_date, 0, xbox_base_key)
self._train_pass._base_key = xbox_base_key
def train_pass(self, context):
stdout_str = ""
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
base_key = self._train_pass._base_key
pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M")
self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout' : stdout_str})
train_begin_time = time.time()
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'load into memory done, cost %s sec', 'stdout' : stdout_str})
current_dataset = {}
for name in self._dataset:
current_dataset[name] = self._dataset[name].load_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
})
cost_printer.done()
kagle_util.rank0_print("going to global shuffle")
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {
'master': True, 'stdout' : stdout_str,
'log_format' : 'global shuffle done, cost %s sec'})
for name in current_dataset:
current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread'])
cost_printer.done()
# str(dataset.get_shuffle_data_size(fleet))
if self.global_config['prefetch_data']:
next_pass_time = (self._train_pass._current_train_time +
datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M")
for name in self._dataset:
self._dataset[name].preload_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
})
pure_train_begin = time.time()
for executor in self.global_config['executor']:
self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str)
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'release_memory cost %s sec'})
for name in current_dataset:
current_dataset[name].release_memory()
pure_train_cost = time.time() - pure_train_begin
if self._train_pass.is_checkpoint_pass(pass_id):
self.save_model(day, pass_id, base_key)
train_end_time = time.time()
train_cost = train_end_time - train_begin_time
other_cost = train_cost - pure_train_cost
log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost)
for executor in self._exector_context:
log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']'
log_str += '[other_cost:' + str(other_cost) + ']'
kagle_util.rank0_print(log_str)
stdout_str += kagle_util.now_time_str() + log_str
sys.stdout.write(stdout_str)
stdout_str = ""
if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day'
return
elif not self._train_pass.next():
context['is_exit'] = True
import os
import sys
import time
import datetime
import kagle_fs
import numpy as np
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
def get_env_value(env_name):
return os.popen("echo -n ${" + env_name + "}").read().strip()
def now_time_str():
return "\n" + time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) + "[0]:"
def get_absolute_path(path, params):
if path.startswith('afs:') or path.startswith('hdfs:'):
sub_path = path.split('fs:')[1]
if ':' in sub_path: #such as afs://xxx:prot/xxxx
return path
elif 'fs_name' in params:
return params['fs_name'] + sub_path
else:
return path
def make_datetime(date_str, fmt = None):
if fmt is None:
if len(date_str) == 8: #%Y%m%d
return datetime.datetime.strptime(date_str, '%Y%m%d')
if len(date_str) == 12: #%Y%m%d%H%M
return datetime.datetime.strptime(date_str, '%Y%m%d%H%M')
return datetime.datetime.strptime(date_str, fmt)
def wroker_numric_opt(value, opt):
local_value = np.array([value])
global_value = np.copy(local_value) * 0
fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt)
return global_value[0]
def worker_numric_sum(value):
from mpi4py import MPI
return wroker_numric_opt(value, MPI.SUM)
def worker_numric_avg(value):
return worker_numric_sum(value) / fleet.worker_num()
def worker_numric_min(value):
from mpi4py import MPI
return wroker_numric_opt(value, MPI.MIN)
def worker_numric_max(value):
from mpi4py import MPI
return wroker_numric_opt(value, MPI.MAX)
def rank0_print(log_str):
print_log(log_str, {'master': True})
def print_log(log_str, params):
if params['master']:
if fleet.worker_index() == 0:
print(log_str)
sys.stdout.flush()
else:
print(log_str)
if 'stdout' in params:
params['stdout'] += str(datetime.datetime.now()) + log_str
def print_cost(cost, params):
log_str = params['log_format'] % cost
print_log(log_str, params)
return log_str
class CostPrinter:
def __init__(self, callback, callback_params):
self.reset(callback, callback_params)
pass
def __del__(self):
if not self._done:
self.done()
pass
def reset(self, callback, callback_params):
self._done = False
self._callback = callback
self._callback_params = callback_params
self._begin_time = time.time()
pass
def done(self):
cost = time.time() - self._begin_time
log_str = self._callback(cost, self._callback_params) #cost(s)
self._done = True
return cost, log_str
class PathGenerator:
def __init__(self, config):
self._templates = {}
self.add_path_template(config)
pass
def add_path_template(self, config):
if 'templates' in config:
for template in config['templates']:
self._templates[template['name']] = template['template']
pass
def generate_path(self, template_name, param):
if template_name in self._templates:
if 'time_format' in param:
str = param['time_format'].strftime(self._templates[template_name])
return str.format(**param)
return self._templates[template_name].format(**param)
else:
return ""
class TimeTrainPass:
def __init__(self, global_config):
self._config = global_config['epoch']
if '+' in self._config['days']:
day_str = self._config['days'].replace(' ', '')
day_fields = day_str.split('+')
self._begin_day = make_datetime(day_fields[0].strip())
if len(day_fields) == 1 or len(day_fields[1]) == 0:
#100 years, meaning to continuous running
self._end_day = self._begin_day + datetime.timedelta(days=36500)
else:
# example: 2020212+10
run_day = int(day_fields[1].strip())
self._end_day =self._begin_day + datetime.timedelta(days=run_day)
else:
# example: {20191001..20191031}
days = os.popen("echo -n " + self._config['days']).read().split(" ")
self._begin_day = make_datetime(days[0])
self._end_day = make_datetime(days[len(days) - 1])
self._checkpoint_interval = self._config['checkpoint_interval']
self._dump_inference_interval = self._config['dump_inference_interval']
self._interval_per_pass = self._config['train_time_interval'] #train N min data per pass
self._pass_id = 0
self._inference_pass_id = 0
self._pass_donefile_handler = None
if 'pass_donefile_name' in self._config:
self._train_pass_donefile = global_config['output_path'] + '/' + self._config['pass_donefile_name']
if kagle_fs.is_afs_path(self._train_pass_donefile):
self._pass_donefile_handler = kagle_fs.FileHandler(global_config['io']['afs'])
else:
self._pass_donefile_handler = kagle_fs.FileHandler(global_config['io']['local_fs'])
last_done = self._pass_donefile_handler.cat(self._train_pass_donefile).strip().split('\n')[-1]
done_fileds = last_done.split('\t')
if len(done_fileds) > 4:
self._base_key = done_fileds[1]
self._checkpoint_model_path = done_fileds[2]
self._checkpoint_pass_id = int(done_fileds[3])
self._inference_pass_id = int(done_fileds[4])
self.init_pass_by_id(done_fileds[0], self._checkpoint_pass_id)
def max_pass_num_day(self):
return 24 * 60 / self._interval_per_pass
def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint):
if is_checkpoint:
self._checkpoint_pass_id = pass_id
self._checkpoint_model_path = model_path
done_content = "%s\t%s\t%s\t%s\t%d\n" % (day, base_key,
self._checkpoint_model_path, self._checkpoint_pass_id, pass_id)
self._pass_donefile_handler.write(done_content, self._train_pass_donefile, 'a')
pass
def init_pass_by_id(self, date_str, pass_id):
date_time = make_datetime(date_str)
if pass_id < 1:
pass_id = 0
if (date_time - self._begin_day).total_seconds() > 0:
self._begin_day = date_time
self._pass_id = pass_id
mins = self._interval_per_pass * (pass_id - 1)
self._current_train_time = date_time + datetime.timedelta(minutes=mins)
print(self._current_train_time)
def init_pass_by_time(self, datetime_str):
self._current_train_time = make_datetime(datetime_str)
minus = self._current_train_time.hour * 60 + self._current_train_time.minute;
self._pass_id = minus / self._interval_per_pass + 1
def current_pass():
return self._pass_id
def next(self):
has_next = True
old_pass_id = self._pass_id
if self._pass_id < 1:
self.init_pass_by_time(self._begin_day.strftime("%Y%m%d%H%M"))
else:
next_time = self._current_train_time + datetime.timedelta(minutes=self._interval_per_pass)
if (next_time - self._end_day).total_seconds() > 0:
has_next = False
else:
self.init_pass_by_time(next_time.strftime("%Y%m%d%H%M"))
if has_next and (self._inference_pass_id < self._pass_id or self._pass_id < old_pass_id):
self._inference_pass_id = self._pass_id - 1
return has_next
def is_checkpoint_pass(self, pass_id):
if pass_id < 1:
return True
if pass_id == self.max_pass_num_day():
return False
if pass_id % self._checkpoint_interval == 0:
return True
return False
def need_dump_inference(self, pass_id):
return self._inference_pass_id < pass_id and pass_id % self._dump_inference_interval == 0
def date(self, delta_day=0):
return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d")
def timestamp(self, delta_day=0):
return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp()
import sys
import copy
import yaml
import time
import json
import datetime
import kagle_trainer
from .. import kagle_fs
from .. import kagle_util
from .. import kagle_model
from .. import kagle_metric
from .. import kagle_dataset
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
class AbacusPaddleTrainer(kagle_trainer.Trainer):
def __init__(self, config):
kagle_trainer.Trainer.__init__(self, config)
config['output_path'] = kagle_util.get_absolute_path(
config['output_path'], config['io']['afs'])
self.global_config = config
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self._exector_context = {}
self._metrics = {}
self._path_generator = kagle_util.PathGenerator({
'templates' : [
{'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'},
{'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'},
{'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'},
{'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'},
{'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'}
]
})
if 'path_generator' in config:
self._path_generator.add_path_template(config['path_generator'])
self.regist_context_processor('uninit', self.init)
self.regist_context_processor('startup', self.startup)
self.regist_context_processor('begin_day', self.begin_day)
self.regist_context_processor('train_pass', self.train_pass)
self.regist_context_processor('end_day', self.end_day)
def init(self, context):
fleet.init(self._exe)
data_var_list = []
data_var_name_dict = {}
runnnable_scope = []
runnnable_cost_op = []
context['status'] = 'startup'
for executor in self.global_config['executor']:
scope = fluid.Scope()
self._exector_context[executor['name']] = {}
self._exector_context[executor['name']]['scope'] = scope
self._exector_context[executor['name']]['model'] = kagle_model.create(executor)
model = self._exector_context[executor['name']]['model']
self._metrics.update(model.get_metrics())
runnnable_scope.append(scope)
runnnable_cost_op.append(model.get_cost_op())
for var in model._data_var:
if var.name in data_var_name_dict:
continue
data_var_list.append(var)
data_var_name_dict[var.name] = var
optimizer = kagle_model.FluidModel.build_optimizer({
'metrics' : self._metrics,
'optimizer_conf' : self.global_config['optimizer']
})
optimizer.minimize(runnnable_cost_op, runnnable_scope)
for executor in self.global_config['executor']:
scope = self._exector_context[executor['name']]['scope']
model = self._exector_context[executor['name']]['model']
program = model._build_param['model']['train_program']
if not executor['is_update_sparse']:
program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = []
if 'train_thread_num' not in executor:
executor['train_thread_num'] = global_config['train_thread_num']
with fluid.scope_guard(scope):
self._exe.run(model._build_param['model']['startup_program'])
model.dump_model_program('./')
#server init done
if fleet.is_server():
return 0
self._dataset = {}
for dataset_item in self.global_config['dataset']['data_list']:
dataset_item['data_vars'] = data_var_list
dataset_item.update(self.global_config['io']['afs'])
dataset_item["batch_size"] = self.global_config['batch_size']
self._dataset[dataset_item['name']] = kagle_dataset.FluidTimeSplitDataset(dataset_item)
#if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
# util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
fleet.init_worker()
pass
def print_log(self, log_str, params):
params['index'] = fleet.worker_index()
return kagle_util.print_log(log_str, params)
def print_global_metrics(self, scope, model, monitor_data, stdout_str):
metrics = model.get_metrics()
metric_calculator = kagle_metric.PaddleAUCMetric(None)
for metric in metrics:
metric_param = {'label' : metric, 'metric_dict' : metrics[metric]}
metric_calculator.calculate(scope, metric_param)
metric_result = metric_calculator.get_result_to_string()
self.print_log(metric_result, {'master': True, 'stdout' : stdout_str})
monitor_data += metric_result
metric_calculator.clear(scope, metric_param)
def save_model(self, day, pass_index, base_key):
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'save model cost %s sec'})
model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index})
save_mode = 0 # just save all
if pass_index < 1: #batch_model
save_mode = 3 # unseen_day++, save all
kagle_util.rank0_print("going to save_model %s" % model_path)
fleet.save_persistables(None, model_path, mode=save_mode)
self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True)
cost_printer.done()
return model_path
def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data):
stdout_str = ""
xbox_patch_id = str(int(time.time()))
kagle_util.rank0_print("begin save delta model")
model_path = ""
xbox_model_donefile = ""
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'save xbox model cost %s sec', 'stdout' : stdout_str})
if pass_index < 1:
save_mode = 2
xbox_patch_id = xbox_base_key
model_path = self._path_generator.generate_path('xbox_base', {'day' : day})
xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day' : day})
else:
save_mode = 1
model_path = self._path_generator.generate_path('xbox_delta', {'day' : day, 'pass_id':pass_index})
xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day' : day})
total_save_num = fleet.save_persistables(None, model_path, mode=save_mode)
cost_printer.done()
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True,
'log_format' : 'save cache model cost %s sec', 'stdout' : stdout_str})
model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs'])
if self.global_config['save_cache_model']:
cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode)
model_file_handler.write(
"file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num,
model_path + '/000_cache/sparse_cache.meta', 'w')
cost_printer.done()
kagle_util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num)
save_env_param = {
'executor': self._exe,
'save_combine': True
}
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True,
'log_format' : 'save dense model cost %s sec', 'stdout' : stdout_str})
for executor in self.global_config['executor']:
if 'layer_for_inference' not in executor:
continue
executor_name = executor['name']
model = self._exector_context[executor_name]['model']
save_env_param['inference_list'] = executor['layer_for_inference']
save_env_param['scope'] = self._exector_context[executor_name]['scope']
model.dump_inference_param(save_env_param)
for dnn_layer in executor['layer_for_inference']:
model_file_handler.cp(dnn_layer['save_file_name'],
model_path + '/dnn_plugin/' + dnn_layer['save_file_name'])
cost_printer.done()
xbox_done_info = {
"id" : xbox_patch_id,
"key" : xbox_base_key,
"ins_path" : "",
"ins_tag" : "feasign",
"partition_type" : "2",
"record_count" : "111111",
"monitor_data" : monitor_data,
"mpi_size" : str(fleet.worker_num()),
"input" : model_path.rstrip("/") + "/000",
"job_id" : kagle_util.get_env_value("JOB_ID"),
"job_name" : kagle_util.get_env_value("JOB_NAME")
}
model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
if pass_index > 0:
self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False)
return stdout_str
def run_executor(self, executor_config, dataset, stdout_str):
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
xbox_base_key = self._train_pass._base_key
executor_name = executor_config['name']
scope = self._exector_context[executor_name]['scope']
model = self._exector_context[executor_name]['model']
with fluid.scope_guard(scope):
kagle_util.rank0_print("Begin " + executor_name + " pass")
begin = time.time()
program = model._build_param['model']['train_program']
self._exe.train_from_dataset(program, dataset, scope,
thread=executor_config['train_thread_num'], debug=self.global_config['debug'])
end = time.time()
local_cost = (end-begin) / 60.0
avg_cost = kagle_util.worker_numric_avg(local_cost)
min_cost = kagle_util.worker_numric_min(local_cost)
max_cost = kagle_util.worker_numric_max(local_cost)
kagle_util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost))
self._exector_context[executor_name]['cost'] = max_cost
monitor_data = ""
self.print_global_metrics(scope, model, monitor_data, stdout_str)
kagle_util.rank0_print("End " + executor_name + " pass")
if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']:
stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data)
def startup(self, context):
if fleet.is_server():
fleet.run_server()
context['status'] = 'wait'
return
stdout_str = ""
self._train_pass = kagle_util.TimeTrainPass(self.global_config)
if not self.global_config['cold_start']:
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'load model cost %s sec', 'stdout' : stdout_str})
self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True})
#if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date()
# and config.reqi_dnn_plugin_pass >= self._pass_id:
# fleet.load_one_table(0, self._train_pass._checkpoint_model_path)
#else:
fleet.init_server(self._train_pass._checkpoint_model_path, mode=0)
cost_printer.done()
if self.global_config['save_first_base']:
self.print_log("save_first_base=True", {'master': True})
self.print_log("going to save xbox base model", {'master': True, 'stdout' : stdout_str})
self._train_pass._base_key = int(time.time())
stdout_str += self.save_xbox_model(day, 0, self._train_pass._base_key, "")
context['status'] = 'begin_day'
def begin_day(self, context):
stdout_str = ""
if not self._train_pass.next():
context['is_exit'] = True
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout' : stdout_str})
if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day'
else:
context['status'] = 'train_pass'
def end_day(self, context):
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
xbox_base_key = int(time.time())
context['status'] = 'begin_day'
kagle_util.rank0_print("shrink table")
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'shrink table done, cost %s sec'})
fleet.shrink_sparse_table()
for executor in self._exector_context:
self._exector_context[executor]['model'].shrink({
'scope': self._exector_context[executor]['scope'],
'decay': self.global_config['optimizer']['dense_decay_rate']
})
cost_printer.done()
next_date = self._train_pass.date(delta_day=1)
kagle_util.rank0_print("going to save xbox base model")
self.save_xbox_model(next_date, 0, xbox_base_key, "")
kagle_util.rank0_print("going to save batch model")
self.save_model(next_date, 0, xbox_base_key)
self._train_pass._base_key = xbox_base_key
def train_pass(self, context):
stdout_str = ""
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
base_key = self._train_pass._base_key
pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M")
self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout' : stdout_str})
train_begin_time = time.time()
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'load into memory done, cost %s sec', 'stdout' : stdout_str})
current_dataset = {}
for name in self._dataset:
current_dataset[name] = self._dataset[name].load_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
})
cost_printer.done()
kagle_util.rank0_print("going to global shuffle")
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {
'master': True, 'stdout' : stdout_str,
'log_format' : 'global shuffle done, cost %s sec'})
for name in current_dataset:
current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread'])
cost_printer.done()
# str(dataset.get_shuffle_data_size(fleet))
if self.global_config['prefetch_data']:
next_pass_time = (self._train_pass._current_train_time +
datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M")
for name in self._dataset:
self._dataset[name].preload_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
})
pure_train_begin = time.time()
for executor in self.global_config['executor']:
self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str)
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'release_memory cost %s sec'})
for name in current_dataset:
current_dataset[name].release_memory()
pure_train_cost = time.time() - pure_train_begin
if self._train_pass.is_checkpoint_pass(pass_id):
self.save_model(day, pass_id, base_key)
train_end_time = time.time()
train_cost = train_end_time - train_begin_time
other_cost = train_cost - pure_train_cost
log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost)
for executor in self._exector_context:
log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']'
log_str += '[other_cost:' + str(other_cost) + ']'
kagle_util.rank0_print(log_str)
stdout_str += kagle_util.now_time_str() + log_str
sys.stdout.write(stdout_str)
stdout_str = ""
if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day'
return
elif not self._train_pass.next():
context['is_exit'] = True
import sys
import time
from abc import ABCMeta, abstractmethod
class Trainer(object):
__metaclass__=ABCMeta
def __init__(self, config):
self._status_processor = {}
self._context = {'status': 'uninit', 'is_exit': False}
def regist_context_processor(self, status_name, processor):
self._status_processor[status_name] = processor
def context_process(self, context):
if context['status'] in self._status_processor:
self._status_processor[context['status']](context)
else:
self.other_status_processor(context)
def other_status_processor(self, context):
print('unknow context_status:%s, do nothing' % context['status'])
time.sleep(60)
def reload_train_context(self):
pass
def run(self):
while True:
self.reload_train_context()
self.context_process(self._context)
if self._context['is_exit']:
break
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册