提交 f99dde81 编写于 作者: H haidfs

TransE improved

上级 d1eac725
该工程代码主要是实现自己阅读过的和知识图谱相关的经典算法的代码:
1.TransE是知识图谱中知识表示的经典算法,工程实现了训练代码(多进程通信版)和测试代码
后续如继续进行论文阅读会补充相应的代码
2.TransE论文地址: https://www.utc.fr/~bordesan/dokuwiki/_media/en/transe_nips13.pdf
3.该工程代码是基于wuxiyu的TransE代码进行注释和修改,感谢他的工作 https://github.com/wuxiyu/transE
4.TransE SGD解释: https://blog.csdn.net/weixin_42348333/article/details/89598144
\ No newline at end of file
该工程代码主要是实现自己阅读过的和知识图谱相关的经典算法的代码:
1.TransE是知识图谱中知识表示的经典算法,工程实现了训练代码(多进程通信版)和测试代码
后续如继续进行论文阅读会补充相应的代码
2.由于data文件过大,无法上传,请至https://github.com/thunlp/KB2E下载data.zip并解压至工程的data路径
3.TransE论文地址: https://www.utc.fr/~bordesan/dokuwiki/_media/en/transe_nips13.pdf
###训练部分
####Simple版本
./train_fb15k.sh 0
仅仅使用Python完成对应的训练代码
####Manager版本
./train_fb15k.sh 1
将TransE类的实例在多进程之间传递
####Queue版本
./train_fb15k.sh 2
将TransE类的训练数据传入队列,减小进程开销,加快训练速度
当训练完成之后,再进行测试
###测试部分
####TestTransEMqQueue
python TestTransEMpQueue.py
多进程队列测试加速,效果不明显,单个测试例0.5s,测试结束需要近5h。
####TestMainTF
python TestMainTF.py
tf与多进程测试加速,效果显著,测试结束仅需要8min左右。
###最终测试结果
FB15k
epochs:2000 MeanRank Hits@10
raw filter raw filter
head 320.743 192.152 29.7 41.2
tail 236.984 153.431 36.1 46.2
average 278.863 172.792 32.9 43.7
paper 243 125 34.9 47.1
\ No newline at end of file
import os
import pandas as pd
class KnowledgeGraph:
def __init__(self, data_dir):
# 考虑到tf的各项api使用,Python不能将Tensor类型直接转换成字符串类型,但是可以将TF类型转换成numpy类型
# 所以这里的训练三元组,测试三元组等等,都是id三元组,而不是字符串三元组
self.data_dir = data_dir
self.entity_dict = {}
self.entities = []
self.relation_dict = {}
self.n_entity = 0
self.n_relation = 0
self.training_triples = [] # list of triples in the form of (h, t, r)
self.validation_triples = []
self.test_triples = []
self.n_training_triple = 0
self.n_validation_triple = 0
self.n_test_triple = 0
'''load dicts and triples'''
self.load_dicts()
self.load_triples()
'''construct pools after loading'''
self.training_triple_pool = set(self.training_triples)
self.golden_triple_pool = set(
self.training_triples) | set(
self.validation_triples) | set(
self.test_triples)
def load_dicts(self):
entity_dict_file = 'entity2id.txt'
relation_dict_file = 'relation2id.txt'
print('-----Loading entity dict-----')
entity_df = pd.read_table(
os.path.join(
self.data_dir,
entity_dict_file),
header=None)
self.entity_dict = dict(zip(entity_df[0], entity_df[1]))
self.n_entity = len(self.entity_dict)
self.entities = list(self.entity_dict.values())
print('#entity: {}'.format(self.n_entity))
print('-----Loading relation dict-----')
relation_df = pd.read_table(
os.path.join(
self.data_dir,
relation_dict_file),
header=None)
self.relation_dict = dict(zip(relation_df[0], relation_df[1]))
self.n_relation = len(self.relation_dict)
print('#relation: {}'.format(self.n_relation))
def load_triples(self):
training_file = 'train.txt'
validation_file = 'valid.txt'
test_file = 'test.txt'
print('-----Loading training triples-----')
training_df = pd.read_table(
os.path.join(
self.data_dir,
training_file),
header=None)
self.training_triples = list(zip([self.entity_dict[h] for h in training_df[0]],
[self.entity_dict[t] for t in training_df[1]],
[self.relation_dict[r] for r in training_df[2]]))
self.n_training_triple = len(self.training_triples)
print('#training triple: {}'.format(self.n_training_triple))
print('-----Loading validation triples-----')
validation_df = pd.read_table(
os.path.join(
self.data_dir,
validation_file),
header=None)
self.validation_triples = list(zip([self.entity_dict[h] for h in validation_df[0]],
[self.entity_dict[t] for t in validation_df[1]],
[self.relation_dict[r] for r in validation_df[2]]))
self.n_validation_triple = len(self.validation_triples)
print('#validation triple: {}'.format(self.n_validation_triple))
print('-----Loading test triples------')
test_df = pd.read_table(
os.path.join(
self.data_dir,
test_file),
header=None)
self.test_triples = list(zip([self.entity_dict[h] for h in test_df[0]],
[self.entity_dict[t] for t in test_df[1]],
[self.relation_dict[r] for r in test_df[2]]))
self.n_test_triple = len(self.test_triples)
print('#test triple: {}'.format(self.n_test_triple))
\ No newline at end of file
import logging
import tensorflow as tf
import argparse
from TestDatasetTF import KnowledgeGraph
from TestModelTF import TransE
from TestTransEMpQueue import get_dict_from_vector_file
def main():
parser = argparse.ArgumentParser(description='TransE')
parser.add_argument('--data_dir', type=str, default=r'./data/FB15k/')
parser.add_argument('--score_func', type=str, default='L1')
parser.add_argument('--n_rank_calculator', type=int, default=24)
args = parser.parse_args()
print(args)
kg = KnowledgeGraph(data_dir=args.data_dir)
entity_vector_file = "data/entityVector.txt"
entity_vector_dyct = get_dict_from_vector_file(entity_vector_file)
relation_vector_file = "data/relationVector.txt"
relation_vector_dyct = get_dict_from_vector_file(relation_vector_file)
logging.info("********** Start Test **********")
kge_model = TransE(
kg=kg,
score_func=args.score_func,
n_rank_calculator=args.n_rank_calculator,
entity_vector_dict=entity_vector_dyct,
rels_vector_dict=relation_vector_dyct)
gpu_config = tf.GPUOptions(allow_growth=True)
sess_config = tf.ConfigProto(gpu_options=gpu_config)
with tf.Session(config=sess_config) as sess:
print('-----Initializing tf graph-----')
tf.global_variables_initializer().run()
print('-----Initialization accomplished-----')
kge_model.check_norm()
kge_model.launch_evaluation(session=sess)
if __name__ == '__main__':
main()
\ No newline at end of file
import timeit
import numpy as np
import tensorflow as tf
import multiprocessing as mp
from TestDatasetTF import KnowledgeGraph
class TransE:
def __init__(self, kg: KnowledgeGraph,
score_func,
n_rank_calculator, entity_vector_dict, rels_vector_dict):
self.kg = kg
self.score_func = score_func
self.n_rank_calculator = n_rank_calculator
self.entity_vector_dict = entity_vector_dict
self.rels_vector_dict = rels_vector_dict
self.entity_embedding = None
self.relation_embedding = None
'''ops for evaluation'''
self.eval_triple = tf.placeholder(dtype=tf.int32, shape=[3])
self.idx_head_prediction = None
self.idx_tail_prediction = None
self.build_entity_embedding()
self.build_eval_graph()
def build_entity_embedding(self):
self.entity_embedding = np.array(
list(self.entity_vector_dict.values()))
self.relation_embedding = np.array(
list(self.rels_vector_dict.values()))
def build_eval_graph(self):
with tf.name_scope('evaluation'):
self.idx_head_prediction, self.idx_tail_prediction = self.evaluate(
self.eval_triple)
def evaluate(self, eval_triple):
with tf.name_scope('lookup'):
head = tf.nn.embedding_lookup(
self.entity_embedding, eval_triple[0])
tail = tf.nn.embedding_lookup(
self.entity_embedding, eval_triple[1])
relation = tf.nn.embedding_lookup(
self.relation_embedding, eval_triple[2])
with tf.name_scope('link'):
# 并不太明确这里的用途,h,r,t应该都是[1,dim]维度的向量, self.entity_embedding应该是[n,dim]维度的向量,做加减法得到的是什么类型?
# 如果是list类型,对于不同维度是不能直接加减的。但是对于np.array或者tf的embedding,是可以直接相减的,等同于 self.entity_embedding
# 的每一行都在和h,r,t做运算
distance_head_prediction = self.entity_embedding + relation - tail
distance_tail_prediction = head + relation - self.entity_embedding
with tf.name_scope('rank'):
if self.score_func == 'L1': # L1 score
_, idx_head_prediction = tf.nn.top_k(tf.reduce_sum(
tf.abs(distance_head_prediction), axis=1), k=self.kg.n_entity)
_, idx_tail_prediction = tf.nn.top_k(tf.reduce_sum(
tf.abs(distance_tail_prediction), axis=1), k=self.kg.n_entity)
else: # L2 score
_, idx_head_prediction = tf.nn.top_k(tf.reduce_sum(
tf.square(distance_head_prediction), axis=1), k=self.kg.n_entity)
_, idx_tail_prediction = tf.nn.top_k(tf.reduce_sum(
tf.square(distance_tail_prediction), axis=1), k=self.kg.n_entity)
return idx_head_prediction, idx_tail_prediction
def launch_evaluation(self, session):
eval_result_queue = mp.JoinableQueue()
rank_result_queue = mp.Queue()
print('-----Start evaluation-----')
start = timeit.default_timer()
for _ in range(self.n_rank_calculator):
mp.Process(
target=self.calculate_rank,
kwargs={
'in_queue': eval_result_queue,
'out_queue': rank_result_queue}).start()
n_used_eval_triple = 0
for eval_triple in self.kg.test_triples:
idx_head_prediction, idx_tail_prediction = session.run(
fetches=[
self.idx_head_prediction, self.idx_tail_prediction], feed_dict={
self.eval_triple: eval_triple})
eval_result_queue.put((eval_triple, idx_head_prediction, idx_tail_prediction))
n_used_eval_triple += 1
print(
'[{:.3f}s] #evaluation triple: {}/{}'.format(
timeit.default_timer() - start,
n_used_eval_triple,
self.kg.n_test_triple),
end='\r')
print()
for _ in range(self.n_rank_calculator):
eval_result_queue.put(None)
print('-----Joining all rank calculator-----')
eval_result_queue.join()
print('-----All rank calculation accomplished-----')
print('-----Obtaining evaluation results-----')
'''Raw'''
head_meanrank_raw = 0
head_hits10_raw = 0
tail_meanrank_raw = 0
tail_hits10_raw = 0
'''Filter'''
head_meanrank_filter = 0
head_hits10_filter = 0
tail_meanrank_filter = 0
tail_hits10_filter = 0
for _ in range(n_used_eval_triple):
head_rank_raw, tail_rank_raw, head_rank_filter, tail_rank_filter = rank_result_queue.get()
head_meanrank_raw += head_rank_raw
if head_rank_raw < 10:
head_hits10_raw += 1
tail_meanrank_raw += tail_rank_raw
if tail_rank_raw < 10:
tail_hits10_raw += 1
head_meanrank_filter += head_rank_filter
if head_rank_filter < 10:
head_hits10_filter += 1
tail_meanrank_filter += tail_rank_filter
if tail_rank_filter < 10:
tail_hits10_filter += 1
print('-----Raw-----')
head_meanrank_raw /= n_used_eval_triple
head_hits10_raw /= n_used_eval_triple
tail_meanrank_raw /= n_used_eval_triple
tail_hits10_raw /= n_used_eval_triple
print('-----Head prediction-----')
print(
'MeanRank: {:.3f}, Hits@10: {:.3f}'.format(
head_meanrank_raw,
head_hits10_raw))
print('-----Tail prediction-----')
print(
'MeanRank: {:.3f}, Hits@10: {:.3f}'.format(
tail_meanrank_raw,
tail_hits10_raw))
print('------Average------')
print(
'MeanRank: {:.3f}, Hits@10: {:.3f}'.format(
(head_meanrank_raw + tail_meanrank_raw) / 2,
(head_hits10_raw + tail_hits10_raw) / 2))
print('-----Filter-----')
head_meanrank_filter /= n_used_eval_triple
head_hits10_filter /= n_used_eval_triple
tail_meanrank_filter /= n_used_eval_triple
tail_hits10_filter /= n_used_eval_triple
print('-----Head prediction-----')
print('MeanRank: {:.3f}, Hits@10: {:.3f}'.format(
head_meanrank_filter, head_hits10_filter))
print('-----Tail prediction-----')
print('MeanRank: {:.3f}, Hits@10: {:.3f}'.format(
tail_meanrank_filter, tail_hits10_filter))
print('-----Average-----')
print(
'MeanRank: {:.3f}, Hits@10: {:.3f}'.format(
(head_meanrank_filter + tail_meanrank_filter) / 2,
(head_hits10_filter + tail_hits10_filter) / 2))
print('cost time: {:.3f}s'.format(timeit.default_timer() - start))
print('-----Finish evaluation-----')
def calculate_rank(self, in_queue, out_queue):
while True:
idx_predictions = in_queue.get()
if idx_predictions is None:
in_queue.task_done()
return
else:
eval_triple, idx_head_prediction, idx_tail_prediction = idx_predictions
head, tail, relation = eval_triple
head_rank_raw = 0
tail_rank_raw = 0
head_rank_filter = 0
tail_rank_filter = 0
for candidate in idx_head_prediction[::-1]:
if candidate == head:
break
else:
head_rank_raw += 1
if (candidate, tail,
relation) in self.kg.golden_triple_pool:
continue
else:
head_rank_filter += 1
for candidate in idx_tail_prediction[::-1]:
if candidate == tail:
break
else:
tail_rank_raw += 1
if (head, candidate,
relation) in self.kg.golden_triple_pool:
continue
else:
tail_rank_filter += 1
out_queue.put(
(head_rank_raw,
tail_rank_raw,
head_rank_filter,
tail_rank_filter))
in_queue.task_done()
def check_norm(self):
print('-----Check norm-----')
entity_embedding = self.entity_embedding
relation_embedding = self.relation_embedding
entity_norm = np.linalg.norm(entity_embedding, ord=2, axis=1)
relation_norm = np.linalg.norm(relation_embedding, ord=2, axis=1)
# print('entity norm: {} relation norm: {}'.format(entity_norm, relation_norm))
\ No newline at end of file
from numpy import *
import operator
class Test:
'''基本的评价过程
假设整个知识库中一共有n个实体,那么评价过程如下:
对于每一个测试的三元组a中的头实体或者尾实体,依次替换为整个知识库中的所有其它实体,也就是会产生n个三元组。
分别对上述n个三元组计算其能量值,在transE中,就是计算h+r-t的值。这样可以得到n个能量值,分别对应上述n个三元组。
对上述n个能量值进行升序排序。
记录原本的三元组a的能量值排序后的序号。
对所有处在测试集中的测试三元组重复上述过程。
每个正确三元组的能量值排序后的序号求平均,得到的值我们称为Mean Rank。
计算正确三元组的能量排序后的序号小于10的比例,得到的值我们称为Hits@10。
上述就是评价的过程,共有两个指标:Mean Rank和Hits@10。其中Mean Rank越小越好,Hits@10越大越好。该代码未计算Hits@10,且Python对于这种大量计算速度很慢。
建议读者后续使用清华大学库的Fast_TransX代码,使用C++编写,性能高,能够快速得出训练和测试结果。
'''
def __init__(self, entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test,
label="head", is_fit=False):
self.entity_list = {}
self.relation_list = {}
for name, vec in zip(entity_list, entity_vector_list):
self.entity_list[name] = vec
for name, vec in zip(relation_list, relation_vector_list):
self.relation_list[name] = vec
self.triple_list_train = triple_list_train
self.triple_list_test = triple_list_test
self.rank = []
self.label = label
self.is_fit = is_fit
def write_rank(self, dir):
print("写入")
file = open(dir, 'w')
for r in self.rank:
file.write(str(r[0]) + "\t")
file.write(str(r[1]) + "\t")
file.write(str(r[2]) + "\t")
file.write(str(r[3]) + "\n")
file.close()
def get_rank(self):
cou = 0
for triplet in self.triple_list_test:
rank_list = {}
for entity_temp in self.entity_list.keys():
if self.label == "head":
corrupted_triplet = (entity_temp, triplet[1], triplet[2])
if self.is_fit and (corrupted_triplet in self.triple_list_train):
continue
rank_list[entity_temp] = distance(self.entity_list[entity_temp], self.entity_list[triplet[1]],
self.relation_list[triplet[2]])
else: # 根据标签替换头实体或者替换尾实体计算距离
corrupted_triplet = (triplet[0], entity_temp, triplet[2])
if self.is_fit and (corrupted_triplet in self.triple_list_train):
continue
rank_list[entity_temp] = distance(self.entity_list[triplet[0]], self.entity_list[entity_temp],
self.relation_list[triplet[2]])
name_rank = sorted(rank_list.items(), key=operator.itemgetter(1)) # 按照元素的第一个域进行升序排序
if self.label == 'head':
num_tri = 0
else:
num_tri = 1
x = 1
for i in name_rank:
if i[0] == triplet[num_tri]:
break
x += 1
self.rank.append((triplet, triplet[num_tri], name_rank[0][0], x))
print(x)
cou += 1
if cou % 10000 == 0:
print(cou)
def get_relation_rank(self):
cou = 0
self.rank = []
for triplet in self.triple_list_test:
rank_list = {}
for relation_temp in self.relation_list.keys():
corrupted_triplet = (triplet[0], triplet[1], relation_temp)
if self.is_fit and (corrupted_triplet in self.triple_list_train):
continue
rank_list[relation_temp] = distance(self.entity_list[triplet[0]], self.entity_list[triplet[1]],
self.relation_list[relation_temp])
name_rank = sorted(rank_list.items(), key=operator.itemgetter(1))
x = 1
for i in name_rank:
if i[0] == triplet[2]:
break
x += 1
self.rank.append((triplet, triplet[2], name_rank[0][0], x))
print(x)
cou += 1
if cou % 10000 == 0:
print(cou)
def get_mean_rank(self):
num = 0
for r in self.rank:
num += r[3]
return num / len(self.rank)
def distance(h, t, r):
h = array(h)
t = array(t)
r = array(r)
s = h + r - t
return linalg.norm(s)
def openD(dir, sp="\t"):
# triple = (head, tail, relation)
num = 0
list = []
with open(dir) as file:
lines = file.readlines()
for line in lines:
triple = line.strip().split(sp)
if len(triple) < 3:
continue
list.append(tuple(triple))
num += 1
print(num)
return num, list
def load_data(str):
fr = open(str)
s_arr = [line.strip().split("\t") for line in fr.readlines()]
dat_arr = [[float(s) for s in line[1][1:-1].split(", ")] for line in s_arr]
name_arr = [line[0] for line in s_arr]
return dat_arr, name_arr
if __name__ == '__main__':
dir_train = "data/FB15k/train.txt"
triple_num_train, triple_list_train = openD(dir_train)
dir_test = "data/FB15k/test.txt"
triple_num_test, triple_list_test = openD(dir_test)
dir_entity_vector = "data/entityVector.txt"
entity_vector_list, entity_list = load_data(dir_entity_vector)
dir_relation_vector = "data/relationVector.txt"
relation_vector_list, relation_list = load_data(dir_relation_vector)
print("********** Start test... **********")
test_head_raw = Test(entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test)
test_head_raw.get_rank()
print(test_head_raw.get_mean_rank())
test_head_raw.write_rank("data/test/" + "test_head_raw" + ".txt")
test_head_raw.get_relation_rank()
print(test_head_raw.get_mean_rank())
test_head_raw.write_rank("data/test" + "testRelationRaw" + ".txt")
test_tail_raw = Test(entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test,
label="tail")
test_tail_raw.get_rank()
print(test_tail_raw.get_mean_rank())
test_tail_raw.write_rank("data/test/" + "test_tail_raw" + ".txt")
test_head_fit = Test(entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test,
is_fit=True)
test_head_fit.get_rank()
print(test_head_fit.get_mean_rank())
test_head_fit.write_rank("data/test/" + "test_head_fit" + ".txt")
test_head_fit.get_relation_rank()
print(test_head_fit.get_mean_rank())
test_head_fit.write_rank("data/test/" + "testRelationFit" + ".txt")
test_tail_fit = Test(entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test,
is_fit=True, label="tail")
test_tail_fit.get_rank()
print(test_tail_fit.get_mean_rank())
test_tail_fit.write_rank("data/test/" + "test_tail_fit" + ".txt")
\ No newline at end of file
from numpy import *
import operator
import logging
from TrainTransESimple import get_details_of_triplets_list
from multiprocessing import Queue, JoinableQueue, Process
import timeit
LOG_FORMAT = "%(asctime)s - %(name)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
class Test:
'''基本的评价过程
假设整个知识库中一共有n个实体,那么评价过程如下:
对于每一个测试的三元组a中的头实体或者尾实体,依次替换为整个知识库中的所有其它实体,也就是会产生n个三元组。
分别对上述n个三元组计算其能量值(dist值),在transE中,就是计算h+r-t的值。这样可以得到n个能量值,分别对应上述n个三元组。
对上述n个能量值进行升序排序。
记录原本的三元组a的能量值排序后的序号。
对所有处在测试集中的测试三元组重复上述过程。
每个正确三元组的能量值排序后的序号求平均,得到的值我们称为Mean Rank。
计算正确三元组的能量排序后的序号小于10的比例,得到的值我们称为Hits@10。
上述就是评价的过程,共有两个指标:Mean Rank和Hits@10。其中Mean Rank越小越好,Hits@10越大越好。该代码未计算Hits@10,且Python对于这种大量计算速度很慢。
建议读者后续使用清华大学库的Fast_TransX代码,使用C++编写,性能高,能够快速得出训练和测试结果。
'''
def __init__(self, entity_dyct, relation_dyct, train_triple_list,
test_triple_list,
label="head", is_fit=False, n_rank_calculator=24):
self.entity_dyct = entity_dyct
self.relation_dyct = relation_dyct
self.train_triple_list = train_triple_list
self.test_triple_list = test_triple_list
self.rank = []
self.label = label
self.is_fit = is_fit
self.hit_at_10 = 0
self.count = 0
self.n_rank_calculator = n_rank_calculator
def write_rank(self, file_path):
logging.info("Write int to %s" % file_path)
file = open(file_path, 'w')
for r in self.rank:
file.write(str(r[0]) + "\t")
file.write(str(r[1]) + "\t")
file.write(str(r[2]) + "\t")
file.write(str(r[3]) + "\n")
file.close()
def get_rank_part(self, triplet):
rank_dyct = {}
for ent in self.entity_dyct.keys():
if self.label == "head":
corrupted_triplet = (ent, triplet[1], triplet[2])
if self.is_fit and (
corrupted_triplet in self.train_triple_list):
continue
rank_dyct[ent] = distance(self.entity_dyct[ent], self.entity_dyct[triplet[1]],
self.relation_dyct[triplet[2]])
else: # 根据标签替换头实体或者替换尾实体计算距离
corrupted_triplet = (triplet[0], ent, triplet[2])
if self.is_fit and (
corrupted_triplet in self.train_triple_list):
continue
rank_dyct[ent] = distance(self.entity_dyct[triplet