提交 58a87f49 编写于 作者: D dyonghan 提交者: Gitee

!15 add Graph_Convolutional_Network

Merge pull request !15 from zhengnengjin2/update
此差异已折叠。
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
User-defined API for MindRecord GNN writer.
"""
import os
import pickle as pkl
import numpy as np
import scipy.sparse as sp
# parse args from command line parameter 'graph_api_args'
# args delimiter is ':'
args = os.environ['graph_api_args'].split(':')
CITESEER_PATH = args[0]
dataset_str = 'citeseer'
# profile: (num_features, feature_data_types, feature_shapes)
node_profile = (2, ["float32", "int32"], [[-1], [-1]])
edge_profile = (0, [], [])
node_ids = []
def _normalize_citeseer_features(features):
row_sum = np.array(features.sum(1))
r_inv = np.power(row_sum * 1.0, -1).flatten()
r_inv[np.isinf(r_inv)] = 0.
r_mat_inv = sp.diags(r_inv)
features = r_mat_inv.dot(features)
return features
def _parse_index_file(filename):
"""Parse index file."""
index = []
for line in open(filename):
index.append(int(line.strip()))
return index
def yield_nodes(task_id=0):
"""
Generate node data
Yields:
data (dict): data row which is dict.
"""
print("Node task is {}".format(task_id))
names = ['x', 'y', 'tx', 'ty', 'allx', 'ally']
objects = []
for name in names:
with open("{}/ind.{}.{}".format(CITESEER_PATH, dataset_str, name), 'rb') as f:
objects.append(pkl.load(f, encoding='latin1'))
x, y, tx, ty, allx, ally = tuple(objects)
test_idx_reorder = _parse_index_file(
"{}/ind.{}.test.index".format(CITESEER_PATH, dataset_str))
test_idx_range = np.sort(test_idx_reorder)
tx = _normalize_citeseer_features(tx)
allx = _normalize_citeseer_features(allx)
# Fix citeseer dataset (there are some isolated nodes in the graph)
# Find isolated nodes, add them as zero-vecs into the right position
test_idx_range_full = range(min(test_idx_reorder), max(test_idx_reorder)+1)
tx_extended = sp.lil_matrix((len(test_idx_range_full), x.shape[1]))
tx_extended[test_idx_range-min(test_idx_range), :] = tx
tx = tx_extended
ty_extended = np.zeros((len(test_idx_range_full), y.shape[1]))
ty_extended[test_idx_range-min(test_idx_range), :] = ty
ty = ty_extended
features = sp.vstack((allx, tx)).tolil()
features[test_idx_reorder, :] = features[test_idx_range, :]
features = features.A
labels = np.vstack((ally, ty))
labels[test_idx_reorder, :] = labels[test_idx_range, :]
line_count = 0
for i, label in enumerate(labels):
if not 1 in label.tolist():
continue
node = {'id': i, 'type': 0, 'feature_1': features[i].tolist(),
'feature_2': label.tolist().index(1)}
line_count += 1
node_ids.append(i)
yield node
print('Processed {} lines for nodes.'.format(line_count))
def yield_edges(task_id=0):
"""
Generate edge data
Yields:
data (dict): data row which is dict.
"""
print("Edge task is {}".format(task_id))
with open("{}/ind.{}.graph".format(CITESEER_PATH, dataset_str), 'rb') as f:
graph = pkl.load(f, encoding='latin1')
line_count = 0
for i in graph:
for dst_id in graph[i]:
if not i in node_ids:
print('Source node {} does not exist.'.format(i))
continue
if not dst_id in node_ids:
print('Destination node {} does not exist.'.format(
dst_id))
continue
edge = {'id': line_count,
'src_id': i, 'dst_id': dst_id, 'type': 0}
line_count += 1
yield edge
print('Processed {} lines for edges.'.format(line_count))
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
User-defined API for MindRecord GNN writer.
"""
import os
import pickle as pkl
import numpy as np
import scipy.sparse as sp
# parse args from command line parameter 'graph_api_args'
# args delimiter is ':'
args = os.environ['graph_api_args'].split(':')
CORA_PATH = args[0]
dataset_str = 'cora'
# profile: (num_features, feature_data_types, feature_shapes)
node_profile = (2, ["float32", "int32"], [[-1], [-1]])
edge_profile = (0, [], [])
def _normalize_cora_features(features):
row_sum = np.array(features.sum(1))
r_inv = np.power(row_sum * 1.0, -1).flatten()
r_inv[np.isinf(r_inv)] = 0.
r_mat_inv = sp.diags(r_inv)
features = r_mat_inv.dot(features)
return features
def _parse_index_file(filename):
"""Parse index file."""
index = []
for line in open(filename):
index.append(int(line.strip()))
return index
def yield_nodes(task_id=0):
"""
Generate node data
Yields:
data (dict): data row which is dict.
"""
print("Node task is {}".format(task_id))
names = ['tx', 'ty', 'allx', 'ally']
objects = []
for name in names:
with open("{}/ind.{}.{}".format(CORA_PATH, dataset_str, name), 'rb') as f:
objects.append(pkl.load(f, encoding='latin1'))
tx, ty, allx, ally = tuple(objects)
test_idx_reorder = _parse_index_file(
"{}/ind.{}.test.index".format(CORA_PATH, dataset_str))
test_idx_range = np.sort(test_idx_reorder)
features = sp.vstack((allx, tx)).tolil()
features[test_idx_reorder, :] = features[test_idx_range, :]
features = _normalize_cora_features(features)
features = features.A
labels = np.vstack((ally, ty))
labels[test_idx_reorder, :] = labels[test_idx_range, :]
line_count = 0
for i, label in enumerate(labels):
node = {'id': i, 'type': 0, 'feature_1': features[i].tolist(),
'feature_2': label.tolist().index(1)}
line_count += 1
yield node
print('Processed {} lines for nodes.'.format(line_count))
def yield_edges(task_id=0):
"""
Generate edge data
Yields:
data (dict): data row which is dict.
"""
print("Edge task is {}".format(task_id))
with open("{}/ind.{}.graph".format(CORA_PATH, dataset_str), 'rb') as f:
graph = pkl.load(f, encoding='latin1')
line_count = 0
for i in graph:
for dst_id in graph[i]:
edge = {'id': line_count,
'src_id': i, 'dst_id': dst_id, 'type': 0}
line_count += 1
yield edge
print('Processed {} lines for edges.'.format(line_count))
# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
Graph data convert tool for MindRecord.
"""
import numpy as np
__all__ = ['GraphMapSchema']
class GraphMapSchema:
"""
Class is for transformation from graph data to MindRecord.
"""
def __init__(self):
"""
init
"""
self.num_node_features = 0
self.num_edge_features = 0
self.union_schema_in_mindrecord = {
"first_id": {"type": "int64"},
"second_id": {"type": "int64"},
"third_id": {"type": "int64"},
"type": {"type": "int32"},
"attribute": {"type": "string"}, # 'n' for ndoe, 'e' for edge
"node_feature_index": {"type": "int32", "shape": [-1]},
"edge_feature_index": {"type": "int32", "shape": [-1]}
}
def get_schema(self):
"""
Get schema
"""
return self.union_schema_in_mindrecord
def set_node_feature_profile(self, num_features, features_data_type, features_shape):
"""
Set node features profile
"""
if num_features != len(features_data_type) or num_features != len(features_shape):
raise ValueError("Node feature profile is not match.")
self.num_node_features = num_features
for i in range(num_features):
k = i + 1
field_key = 'node_feature_' + str(k)
field_value = {"type": features_data_type[i], "shape": features_shape[i]}
self.union_schema_in_mindrecord[field_key] = field_value
def set_edge_feature_profile(self, num_features, features_data_type, features_shape):
"""
Set edge features profile
"""
if num_features != len(features_data_type) or num_features != len(features_shape):
raise ValueError("Edge feature profile is not match.")
self.num_edge_features = num_features
for i in range(num_features):
k = i + 1
field_key = 'edge_feature_' + str(k)
field_value = {"type": features_data_type[i], "shape": features_shape[i]}
self.union_schema_in_mindrecord[field_key] = field_value
def transform_node(self, node):
"""
Executes transformation from node data to union format.
Args:
node(schema): node's data
Returns:
graph data with union schema
"""
node_graph = {"first_id": node["id"], "second_id": 0, "third_id": 0, "attribute": 'n', "type": node["type"],
"node_feature_index": []}
for i in range(self.num_node_features):
k = i + 1
node_field_key = 'feature_' + str(k)
graph_field_key = 'node_feature_' + str(k)
graph_field_type = self.union_schema_in_mindrecord[graph_field_key]["type"]
if node_field_key in node:
node_graph["node_feature_index"].append(k)
node_graph[graph_field_key] = np.reshape(np.array(node[node_field_key], dtype=graph_field_type), [-1])
else:
node_graph[graph_field_key] = np.reshape(np.array([0], dtype=graph_field_type), [-1])
if node_graph["node_feature_index"]:
node_graph["node_feature_index"] = np.array(node_graph["node_feature_index"], dtype="int32")
else:
node_graph["node_feature_index"] = np.array([-1], dtype="int32")
node_graph["edge_feature_index"] = np.array([-1], dtype="int32")
for i in range(self.num_edge_features):
k = i + 1
graph_field_key = 'edge_feature_' + str(k)
graph_field_type = self.union_schema_in_mindrecord[graph_field_key]["type"]
node_graph[graph_field_key] = np.reshape(np.array([0], dtype=graph_field_type), [-1])
return node_graph
def transform_edge(self, edge):
"""
Executes transformation from edge data to union format.
Args:
edge(schema): edge's data
Returns:
graph data with union schema
"""
edge_graph = {"first_id": edge["id"], "second_id": edge["src_id"], "third_id": edge["dst_id"], "attribute": 'e',
"type": edge["type"], "edge_feature_index": []}
for i in range(self.num_edge_features):
k = i + 1
edge_field_key = 'feature_' + str(k)
graph_field_key = 'edge_feature_' + str(k)
graph_field_type = self.union_schema_in_mindrecord[graph_field_key]["type"]
if edge_field_key in edge:
edge_graph["edge_feature_index"].append(k)
edge_graph[graph_field_key] = np.reshape(np.array(edge[edge_field_key], dtype=graph_field_type), [-1])
else:
edge_graph[graph_field_key] = np.reshape(np.array([0], dtype=graph_field_type), [-1])
if edge_graph["edge_feature_index"]:
edge_graph["edge_feature_index"] = np.array(edge_graph["edge_feature_index"], dtype="int32")
else:
edge_graph["edge_feature_index"] = np.array([-1], dtype="int32")
edge_graph["node_feature_index"] = np.array([-1], dtype="int32")
for i in range(self.num_node_features):
k = i + 1
graph_field_key = 'node_feature_' + str(k)
graph_field_type = self.union_schema_in_mindrecord[graph_field_key]["type"]
edge_graph[graph_field_key] = np.array([0], dtype=graph_field_type)
return edge_graph
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
######################## write mindrecord example ########################
Write mindrecord by data dictionary:
python writer.py --mindrecord_script /YourScriptPath ...
"""
import argparse
import os
import time
from importlib import import_module
from multiprocessing import Pool
import shutil
from mindspore.mindrecord import FileWriter
from .graph_map_schema import GraphMapSchema
# import cora.mr_api
def read_args():
"""
read args
"""
parser = argparse.ArgumentParser(description='Mind record writer')
parser.add_argument('--mindrecord_script', type=str, default="template",
help='path where script is saved')
parser.add_argument('--mindrecord_file', type=str, default="/tmp/mindrecord/xyz",
help='written file name prefix')
parser.add_argument('--mindrecord_partitions', type=int, default=1,
help='number of written files')
parser.add_argument('--mindrecord_header_size_by_bit', type=int, default=24,
help='mindrecord file header size')
parser.add_argument('--mindrecord_page_size_by_bit', type=int, default=25,
help='mindrecord file page size')
parser.add_argument('--mindrecord_workers', type=int, default=8,
help='number of parallel workers')
parser.add_argument('--num_node_tasks', type=int, default=1,
help='number of node tasks')
parser.add_argument('--num_edge_tasks', type=int, default=1,
help='number of node tasks')
parser.add_argument('--graph_api_args', type=str, default="/tmp/nodes.csv:/tmp/edges.csv",
help='nodes and edges data file, csv format with header.')
ret_args = parser.parse_args()
return ret_args
def run(cfg):
args = read_args()
#建立输出文件夹
cur_path = os.getcwd()
M_PATH = os.path.join(cur_path, cfg.MINDRECORD_PATH)
if os.path.exists(M_PATH):
shutil.rmtree(M_PATH) # 删除文件夹
os.mkdir(M_PATH)
cfg.SRC_PATH = os.path.join(cur_path, cfg.SRC_PATH)
#参数
args.mindrecord_script= cfg.DATASET_NAME
args.mindrecord_file=os.path.join(cfg.MINDRECORD_PATH,cfg.DATASET_NAME)
args.mindrecord_partitions=cfg.mindrecord_partitions
args.mindrecord_header_size_by_bit=cfg.mindrecord_header_size_by_bit
args.mindrecord_page_size_by_bit=cfg.mindrecord_header_size_by_bit
args.graph_api_args=cfg.SRC_PATH
start_time = time.time()
# pass mr_api arguments
os.environ['graph_api_args'] = args.graph_api_args
try:
mr_api = import_module('graph_to_mindrecord.'+args.mindrecord_script + '.mr_api')
except ModuleNotFoundError:
raise RuntimeError("Unknown module path: {}".format(args.mindrecord_script + '.mr_api'))
# init graph schema
graph_map_schema = GraphMapSchema()
num_features, feature_data_types, feature_shapes = mr_api.node_profile
graph_map_schema.set_node_feature_profile(num_features, feature_data_types, feature_shapes)
num_features, feature_data_types, feature_shapes = mr_api.edge_profile
graph_map_schema.set_edge_feature_profile(num_features, feature_data_types, feature_shapes)
graph_schema = graph_map_schema.get_schema()
#----------------------------------------------------------------------------------------------
def init_writer(mr_schema):
"""
init writer
"""
print("Init writer ...")
mr_writer = FileWriter(args.mindrecord_file, args.mindrecord_partitions)
# set the header size
if args.mindrecord_header_size_by_bit != 24:
header_size = 1 << args.mindrecord_header_size_by_bit
mr_writer.set_header_size(header_size)
# set the page size
if args.mindrecord_page_size_by_bit != 25:
page_size = 1 << args.mindrecord_page_size_by_bit
mr_writer.set_page_size(page_size)
# create the schema
mr_writer.add_schema(mr_schema, "mindrecord_graph_schema")
# open file and set header
mr_writer.open_and_set_header()
return mr_writer
def run_parallel_workers():
"""
run parallel workers
"""
# set number of workers
num_workers = args.mindrecord_workers
task_list = list(range(args.num_node_tasks))
if num_workers > args.num_node_tasks:
num_workers = args.num_node_tasks
if os.name == 'nt':
for window_task_id in task_list:
exec_task(window_task_id, False)
elif args.num_node_tasks > 1:
with Pool(num_workers) as p:
p.map(exec_task, task_list)
else:
exec_task(0, False)
def exec_task(task_id, parallel_writer=True):
"""
Execute task with specified task id
"""
print("exec task {}, parallel: {} ...".format(task_id, parallel_writer))
imagenet_iter = mindrecord_dict_data(task_id)
batch_size = 512
transform_count = 0
while True:
data_list = []
try:
for _ in range(batch_size):
data = imagenet_iter.__next__()
if 'dst_id' in data:
data = graph_map_schema.transform_edge(data)
else:
data = graph_map_schema.transform_node(data)
data_list.append(data)
transform_count += 1
writer.write_raw_data(data_list, parallel_writer=parallel_writer)
print("transformed {} record...".format(transform_count))
except StopIteration:
if data_list:
writer.write_raw_data(data_list, parallel_writer=parallel_writer)
print("transformed {} record...".format(transform_count))
break
#-------------------------------------------------------------------------------------
# init writer
writer = init_writer(graph_schema)
# write nodes data
mindrecord_dict_data = mr_api.yield_nodes
run_parallel_workers()
# write edges data
mindrecord_dict_data = mr_api.yield_edges
run_parallel_workers()
# writer wrap up
ret = writer.commit()
end_time = time.time()
print("--------------------------------------------")
print("END. Total time: {}".format(end_time - start_time))
print("--------------------------------------------")
import time
import argparse
import os
from easydict import EasyDict as edict
import numpy as np
from mindspore import context
from src.gcn import GCN, LossAccuracyWrapper, TrainNetWrapper
from src.config import ConfigGCN
from src.dataset import get_adj_features_labels, get_mask
from graph_to_mindrecord.writer import run
os.environ['DEVICE_ID']='6'
context.set_context(mode=context.GRAPH_MODE,device_target="Ascend", save_graphs=False)
def train(args_opt):
"""Train model."""
np.random.seed(args_opt.seed)
config = ConfigGCN()
adj, feature, label = get_adj_features_labels(args_opt.data_dir)
nodes_num = label.shape[0]
train_mask = get_mask(nodes_num, 0, args_opt.train_nodes_num)
eval_mask = get_mask(nodes_num, args_opt.train_nodes_num, args_opt.train_nodes_num + args_opt.eval_nodes_num)
test_mask = get_mask(nodes_num, nodes_num - args_opt.test_nodes_num, nodes_num)
class_num = label.shape[1]
gcn_net = GCN(config, adj, feature, class_num)
gcn_net.add_flags_recursive(fp16=True)
eval_net = LossAccuracyWrapper(gcn_net, label, eval_mask, config.weight_decay)
test_net = LossAccuracyWrapper(gcn_net, label, test_mask, config.weight_decay)
train_net = TrainNetWrapper(gcn_net, label, train_mask, config)
loss_list = []
for epoch in range(config.epochs):
t = time.time()
train_net.set_train()
train_result = train_net()
train_loss = train_result[0].asnumpy()
train_accuracy = train_result[1].asnumpy()
eval_net.set_train(False)
eval_result = eval_net()
eval_loss = eval_result[0].asnumpy()
eval_accuracy = eval_result[1].asnumpy()
loss_list.append(eval_loss)
if epoch%10==0:
print("Epoch:", '%04d' % (epoch), "train_loss=", "{:.5f}".format(train_loss),
"train_acc=", "{:.5f}".format(train_accuracy), "val_loss=", "{:.5f}".format(eval_loss),
"val_acc=", "{:.5f}".format(eval_accuracy), "time=", "{:.5f}".format(time.time() - t))
if epoch > config.early_stopping and loss_list[-1] > np.mean(loss_list[-(config.early_stopping+1):-1]):
print("Early stopping...")
break
t_test = time.time()
test_net.set_train(False)
test_result = test_net()
test_loss = test_result[0].asnumpy()
test_accuracy = test_result[1].asnumpy()
print("Test set results:", "loss=", "{:.5f}".format(test_loss),
"accuracy=", "{:.5f}".format(test_accuracy), "time=", "{:.5f}".format(time.time() - t_test))
if __name__ == '__main__':
#------------------------定义变量------------------------------
parser = argparse.ArgumentParser(description='GCN')
parser.add_argument('--data_url', type=str, default='./data', help='Dataset directory')
parser.add_argument('--train_url', type=str, default=None, help='Train output url')
args, unknown = parser.parse_known_args()
import moxing as mox
mox.file.copy_parallel(args.data_url, dst_url='./data') # 将OBS桶中数据拷贝到容器中
dataname = 'cora'
datadir_save = './data_mr'
datadir = os.path.join(datadir_save, dataname)
cfg = edict({
'SRC_PATH': './data',
'MINDRECORD_PATH': datadir_save,
'DATASET_NAME': dataname, # citeseer,cora
'mindrecord_partitions':1,
'mindrecord_header_size_by_bit' : 18,
'mindrecord_page_size_by_bit' : 20,
'data_dir': datadir,
'seed' : 123,
'train_nodes_num':140,
'eval_nodes_num':500,
'test_nodes_num':1000
})
#转换数据格式
print("============== Graph To Mindrecord ==============")
run(cfg)
#训练
print("============== Starting Training ==============")
train(cfg)
mox.file.copy_parallel(src_url='data_mr', dst_url=cfg.MINDRECORD_PATH) # src_url本地 将容器输出放入OBS桶中
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册