提交 d1eac725 编写于 作者: H haidfs

update multi process

上级 74d5fdb4
# TransE
TransE方法的Python实现,解释SGD中TransE的向量更新见如下链接
https://blog.csdn.net/weixin_42348333/article/details/89598144
该工程代码主要是实现自己阅读过的和知识图谱相关的经典算法的代码:
1.TransE是知识图谱中知识表示的经典算法,工程实现了训练代码(多进程通信版)和测试代码
后续如继续进行论文阅读会补充相应的代码
2.TransE论文地址: https://www.utc.fr/~bordesan/dokuwiki/_media/en/transe_nips13.pdf
3.该工程代码是基于wuxiyu的TransE代码进行注释和修改,感谢他的工作 https://github.com/wuxiyu/transE
4.TransE SGD解释: https://blog.csdn.net/weixin_42348333/article/details/89598144
\ No newline at end of file
from numpy import *
import operator
class Test:
'''基本的评价过程
假设整个知识库中一共有n个实体,那么评价过程如下:
对于每一个测试的三元组a中的头实体或者尾实体,依次替换为整个知识库中的所有其它实体,也就是会产生n个三元组。
分别对上述n个三元组计算其能量值,在transE中,就是计算h+r-t的值。这样可以得到n个能量值,分别对应上述n个三元组。
对上述n个能量值进行升序排序。
记录原本的三元组a的能量值排序后的序号。
对所有处在测试集中的测试三元组重复上述过程。
每个正确三元组的能量值排序后的序号求平均,得到的值我们称为Mean Rank。
计算正确三元组的能量排序后的序号小于10的比例,得到的值我们称为Hits@10。
上述就是评价的过程,共有两个指标:Mean Rank和Hits@10。其中Mean Rank越小越好,Hits@10越大越好。该代码未计算Hits@10,且Python对于这种大量计算速度很慢。
建议读者后续使用清华大学库的Fast_TransX代码,使用C++编写,性能高,能够快速得出训练和测试结果。
'''
def __init__(self, entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test,
label="head", is_fit=False):
self.entity_list = {}
self.relation_list = {}
for name, vec in zip(entity_list, entity_vector_list):
self.entity_list[name] = vec
for name, vec in zip(relation_list, relation_vector_list):
self.relation_list[name] = vec
self.triple_list_train = triple_list_train
self.triple_list_test = triple_list_test
self.rank = []
self.label = label
self.is_fit = is_fit
def write_rank(self, dir):
print("写入")
file = open(dir, 'w')
for r in self.rank:
file.write(str(r[0]) + "\t")
file.write(str(r[1]) + "\t")
file.write(str(r[2]) + "\t")
file.write(str(r[3]) + "\n")
file.close()
def get_rank(self):
cou = 0
for triplet in self.triple_list_test:
rank_list = {}
for entity_temp in self.entity_list.keys():
if self.label == "head":
corrupted_triplet = (entity_temp, triplet[1], triplet[2])
if self.is_fit and (corrupted_triplet in self.triple_list_train):
continue
rank_list[entity_temp] = distance(self.entity_list[entity_temp], self.entity_list[triplet[1]],
self.relation_list[triplet[2]])
else: # 根据标签替换头实体或者替换尾实体计算距离
corrupted_triplet = (triplet[0], entity_temp, triplet[2])
if self.is_fit and (corrupted_triplet in self.triple_list_train):
continue
rank_list[entity_temp] = distance(self.entity_list[triplet[0]], self.entity_list[entity_temp],
self.relation_list[triplet[2]])
name_rank = sorted(rank_list.items(), key=operator.itemgetter(1)) # 按照元素的第一个域进行升序排序
if self.label == 'head':
num_tri = 0
else:
num_tri = 1
x = 1
for i in name_rank:
if i[0] == triplet[num_tri]:
break
x += 1
self.rank.append((triplet, triplet[num_tri], name_rank[0][0], x))
print(x)
cou += 1
if cou % 10000 == 0:
print(cou)
def get_relation_rank(self):
cou = 0
self.rank = []
for triplet in self.triple_list_test:
rank_list = {}
for relation_temp in self.relation_list.keys():
corrupted_triplet = (triplet[0], triplet[1], relation_temp)
if self.is_fit and (corrupted_triplet in self.triple_list_train):
continue
rank_list[relation_temp] = distance(self.entity_list[triplet[0]], self.entity_list[triplet[1]],
self.relation_list[relation_temp])
name_rank = sorted(rank_list.items(), key=operator.itemgetter(1))
x = 1
for i in name_rank:
if i[0] == triplet[2]:
break
x += 1
self.rank.append((triplet, triplet[2], name_rank[0][0], x))
print(x)
cou += 1
if cou % 10000 == 0:
print(cou)
def get_mean_rank(self):
num = 0
for r in self.rank:
num += r[3]
return num / len(self.rank)
def distance(h, t, r):
h = array(h)
t = array(t)
r = array(r)
s = h + r - t
return linalg.norm(s)
def openD(dir, sp="\t"):
# triple = (head, tail, relation)
num = 0
list = []
with open(dir) as file:
lines = file.readlines()
for line in lines:
triple = line.strip().split(sp)
if len(triple) < 3:
continue
list.append(tuple(triple))
num += 1
print(num)
return num, list
def load_data(str):
fr = open(str)
s_arr = [line.strip().split("\t") for line in fr.readlines()]
dat_arr = [[float(s) for s in line[1][1:-1].split(", ")] for line in s_arr]
name_arr = [line[0] for line in s_arr]
return dat_arr, name_arr
if __name__ == '__main__':
dir_train = "data/FB15k/train.txt"
triple_num_train, triple_list_train = openD(dir_train)
dir_test = "data/FB15k/test.txt"
triple_num_test, triple_list_test = openD(dir_test)
dir_entity_vector = "data/entityVector.txt"
entity_vector_list, entity_list = load_data(dir_entity_vector)
dir_relation_vector = "data/relationVector.txt"
relation_vector_list, relation_list = load_data(dir_relation_vector)
print("********** Start test... **********")
test_head_raw = Test(entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test)
test_head_raw.get_rank()
print(test_head_raw.get_mean_rank())
test_head_raw.write_rank("data/test/" + "test_head_raw" + ".txt")
test_head_raw.get_relation_rank()
print(test_head_raw.get_mean_rank())
test_head_raw.write_rank("data/test" + "testRelationRaw" + ".txt")
test_tail_raw = Test(entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test,
label="tail")
test_tail_raw.get_rank()
print(test_tail_raw.get_mean_rank())
test_tail_raw.write_rank("data/test/" + "test_tail_raw" + ".txt")
test_head_fit = Test(entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test,
is_fit=True)
test_head_fit.get_rank()
print(test_head_fit.get_mean_rank())
test_head_fit.write_rank("data/test/" + "test_head_fit" + ".txt")
test_head_fit.get_relation_rank()
print(test_head_fit.get_mean_rank())
test_head_fit.write_rank("data/test/" + "testRelationFit" + ".txt")
test_tail_fit = Test(entity_list, entity_vector_list, relation_list, relation_vector_list, triple_list_train,
triple_list_test,
is_fit=True, label="tail")
test_tail_fit.get_rank()
print(test_tail_fit.get_mean_rank())
test_tail_fit.write_rank("data/test/" + "test_tail_fit" + ".txt")
\ No newline at end of file
from random import uniform, sample, choice
import numpy as np
from copy import deepcopy
import time
from multiprocessing import Pool
from multiprocessing import Process, Value, Lock
from multiprocessing.managers import BaseManager
import multiprocessing
from numba import jit
def get_details_of_entityOrRels_list(file_path, split_delimeter="\t"):
......@@ -92,19 +98,28 @@ class TransE(object):
self.rels_vector_dict = rels_vector_dict
def transE(self, cycle_index=20):
count = 0
print("\n********** Start TransE training **********")
for i in range(cycle_index):
if i % 100 == 0:
print("----------------The {} batchs----------------".format(i))
if count == 0:
start_time = time.time()
count += 1
if i % 10 == 0 and i != 0:
print("----------------The {} batches----------------".format(i))
print("The loss is: %.4f" % self.loss)
# 查看最后的结果收敛情况
self.loss_list.append(self.loss)
# self.write_vector("data/entityVector.txt", "entity")
# self.write_vector("data/relationVector.txt", "rels")
self.loss = 0
count = 0
end_time = time.time()
print("One epoch takes %.2f ms." % ((end_time - start_time) * 1000))
start_time = end_time
Sbatch = self.sample(150)
Sbatch = self.sample(1500)
Tbatch = [] # 元组对(原三元组,打碎的三元组)的列表 :{((h,r,t),(h',r,t'))}
for sbatch in Sbatch:
triplets_with_corrupted_triplets = (sbatch, self.get_corrupted_triplets(sbatch))
......@@ -179,12 +194,10 @@ class TransE(object):
if self.normal_form == "L1":
temp_positive_L1 = [1 if temp_positive[i] >= 0 else -1 for i in range(self.dim)]
temp_negative_L1 = [1 if temp_negative[i] >= 0 else -1 for i in range(self.dim)]
temp_positive_L1 = [float(f) for f in temp_positive_L1]
temp_negative_L1 = [float(f) for f in temp_negative_L1]
temp_positive = np.array(temp_positive_L1) * self.learning_rate
temp_negative = np.array(temp_negative_L1) * self.learning_rate
# temp_positive = norm(temp_positive_L1) * self.learning_rate
# temp_negative = norm(temp_negative_L1) * self.learning_rate
temp_positive = np.array(temp_positive_L1) * self.learning_rate
temp_negative = np.array(temp_negative_L1) * self.learning_rate
# 对损失函数的5个参数进行梯度下降, 随机体现在sample函数上
head_entity_vector += temp_positive
......@@ -248,10 +261,13 @@ if __name__ == "__main__":
transE = TransE(entity_list, rels_list, triplets_list, margin=1, dim=50)
print("\nTransE is initializing...")
transE.initialize()
transE.transE(500000)
transE.transE(20000)
# transE.transE2(num_of_epochs=1000, epoch_triplets=15000, num_of_batches=10)
print("********** End TransE training ***********\n")
# 训练的批次并不一定是100的整数倍,将最后更新的向量写到文件
transE.write_vector("data/entityVector.txt", "entity")
transE.write_vector("data/relationVector.txt", "relationship")
transE.write_loss("data/lossList_25cols.txt", 25)
transE.write_loss("data/lossList_1cols.txt", 1)
from random import uniform, sample, choice
import numpy as np
from copy import deepcopy
from multiprocessing import Process, Value, Lock
from multiprocessing.managers import BaseManager
import time
def get_details_of_entityOrRels_list(file_path, split_delimeter="\t"):
num_of_file = 0
lyst = []
with open(file_path) as file:
lines = file.readlines()
for line in lines:
details_and_id = line.strip().split(split_delimeter)
lyst.append(details_and_id[0])
num_of_file += 1
return num_of_file, lyst
def get_details_of_triplets_list(file_path, split_delimeter="\t"):
num_of_file = 0
lyst = []
with open(file_path) as file:
lines = file.readlines()
for line in lines:
triple = line.strip().split(split_delimeter)
if len(triple) < 3:
continue
lyst.append(tuple(triple))
num_of_file += 1
return num_of_file, lyst
def norm(lyst):
# 归一化 单位向量
var = np.linalg.norm(lyst)
i = 0
while i < len(lyst):
lyst[i] = lyst[i] / var
i += 1
# 需要返回array值 因为list不支持减法
# return list
return np.array(lyst)
def dist_L1(h, t, l):
s = h + l - t
# 曼哈顿距离/出租车距离, |x-xi|+|y-yi|直接对向量的各个维度取绝对值相加
# dist = np.fabs(s).sum()
return np.fabs(s).sum()
def dist_L2(h, t, l):
s = h + l - t
# 欧氏距离,是向量的平方和未开方。一定要注意,归一化公式和距离公式的错误书写,会引起收敛的失败
# dist = (s * s).sum()
return (s * s).sum()
class TransE(object):
def __init__(self, entity_list, rels_list, triplets_list, margin=1, learing_rate=0.01, dim=50, normal_form="L1"):
self.learning_rate = learing_rate
self.loss = 0
self.entity_list = entity_list # entityList是entity的list;初始化后,变为字典,key是entity,values是其向量(使用narray)。
self.rels_list = rels_list
self.triplets_list = triplets_list
self.margin = margin
self.dim = dim
self.normal_form = normal_form
self.entity_vector_dict = {}
self.rels_vector_dict = {}
self.loss_list = []
def get_loss(self):
# 参考清华的Fast-TransX的C++代码,确实速度很快,Python接近5个小时的训练C++大概在十几分钟即可完成。粗略的看了一下代码,
# 它对原本的论文中的Sbatch做了修改,直接进行了(总数量为训练三元组数,一个epoch分为多个batch完成,每个batch的每一个三元组都随机采样),随机梯度下降。多线程并发,n个batch对应n个线程
# Python由于历史遗留问题,使用了GIL,全局解释锁,使得Python的多线程近似鸡肋,无法跑满多核cpu,所以考虑使用多进程优化
# 为了使用多进程,使用了manager将transE封装为Proxy对象。由于Proxy对象无法获取封装的TransE类的属性,所以需要写get函数将loss传出。
# 另外值得注意的是,Python的多进程性能不一定优于for循环。基本开销就包括了进程的创建和销毁、上下文切换(进程间需要RPC远程通信以做到类变量共享)。
# 至少在trainTransE和trainTransE_MultiProcess对比来看,trainTransE的for循环一批10个耗时在8s-9s,trainTransE_MultiProcess的一个epoch即一批,耗时在12-13s。
# 进一步优化方法:进程池,实现进程复用?框架:tf??
return self.loss
def clear_loss(self):
# 该函数也是为了Proxy对象外部将损失置0
self.loss = 0
def initialize(self):
"""对论文中的初始化稍加改动
初始化l和e,对于原本的l和e的文件中的/m/06rf7字符串标识转化为定义的dim维向量,对dim维向量进行uniform和norm归一化操作
"""
entity_vector_dict, rels_vector_dict = {}, {}
entity_vector_compo_list, rels_vector_compo_list = [], []
for item, dict, compo_list, name in zip(
[self.entity_list, self.rels_list], [entity_vector_dict, rels_vector_dict],
[entity_vector_compo_list, rels_vector_compo_list], ["entity_vector_dict", "rels_vector_dict"]):
for entity_or_rel in item:
n = 0
compo_list = []
while n < self.dim:
random = uniform(-6 / (self.dim ** 0.5), 6 / (self.dim ** 0.5))
compo_list.append(random)
n += 1
compo_list = norm(compo_list)
dict[entity_or_rel] = compo_list
print("The " + name + "'s initialization is over. It's number is %d." % len(dict))
self.entity_vector_dict = entity_vector_dict
self.rels_vector_dict = rels_vector_dict
def transE(self, cycle_index=1, num=1500):
Sbatch = self.sample(num)
Tbatch = [] # 元组对(原三元组,打碎的三元组)的列表 :{((h,r,t),(h',r,t'))}
for sbatch in Sbatch:
triplets_with_corrupted_triplets = (sbatch, self.get_corrupted_triplets(sbatch))
if triplets_with_corrupted_triplets not in Tbatch:
Tbatch.append(triplets_with_corrupted_triplets)
self.update(Tbatch)
def sample(self, size):
return sample(self.triplets_list, size)
def get_corrupted_triplets(self, triplets):
'''training triplets with either the head or tail replaced by a random entity (but not both at the same time)
:param triplet:单个(h,t,l)
:return corruptedTriplet:'''
# i = uniform(-1, 1) if i
coin = choice([True, False])
# 由于这个时候的(h,t,l)是从train文件里面抽出来的,要打坏的话直接随机寻找一个和头实体不等的实体即可
if coin: # 抛硬币 为真 打破头实体,即第一项
while True:
searching_entity = sample(self.entity_vector_dict.keys(), 1)[0] # 取第一个元素是因为sample返回的是一个列表类型
if searching_entity != triplets[0]:
break
corrupted_triplets = (searching_entity, triplets[1], triplets[2])
else: # 反之,打破尾实体,即第二项
while True:
searching_entity = sample(self.entity_vector_dict.keys(), 1)[0]
if searching_entity != triplets[1]:
break
corrupted_triplets = (triplets[0], searching_entity, triplets[2])
return corrupted_triplets
def update(self, Tbatch):
entity_vector_copy = deepcopy(self.entity_vector_dict)
rels_vector_copy = deepcopy(self.rels_vector_dict)
for triplets_with_corrupted_triplets in Tbatch:
head_entity_vector = entity_vector_copy[triplets_with_corrupted_triplets[0][0]]
tail_entity_vector = entity_vector_copy[triplets_with_corrupted_triplets[0][1]]
relation_vector = rels_vector_copy[triplets_with_corrupted_triplets[0][2]]
head_entity_vector_with_corrupted_triplets = entity_vector_copy[triplets_with_corrupted_triplets[1][0]]
tail_entity_vector_with_corrupted_triplets = entity_vector_copy[triplets_with_corrupted_triplets[1][1]]
head_entity_vector_before_batch = self.entity_vector_dict[triplets_with_corrupted_triplets[0][0]]
tail_entity_vector_before_batch = self.entity_vector_dict[triplets_with_corrupted_triplets[0][1]]
relation_vector_before_batch = self.rels_vector_dict[triplets_with_corrupted_triplets[0][2]]
head_entity_vector_with_corrupted_triplets_before_batch = self.entity_vector_dict[
triplets_with_corrupted_triplets[1][0]]
tail_entity_vector_with_corrupted_triplets_before_batch = self.entity_vector_dict[
triplets_with_corrupted_triplets[1][1]]
if self.normal_form == "L1":
dist_triplets = dist_L1(head_entity_vector_before_batch, tail_entity_vector_before_batch,
relation_vector_before_batch)
dist_corrupted_triplets = dist_L1(head_entity_vector_with_corrupted_triplets_before_batch,
tail_entity_vector_with_corrupted_triplets_before_batch,
relation_vector_before_batch)
else:
dist_triplets = dist_L2(head_entity_vector_before_batch, tail_entity_vector_before_batch,
relation_vector_before_batch)
dist_corrupted_triplets = dist_L2(head_entity_vector_with_corrupted_triplets_before_batch,
tail_entity_vector_with_corrupted_triplets_before_batch,
relation_vector_before_batch)
eg = self.margin + dist_triplets - dist_corrupted_triplets
if eg > 0: # 大于0取原值,小于0则置0.即合页损失函数margin-based ranking criterion
self.loss += eg
temp_positive = 2 * self.learning_rate * (
tail_entity_vector_before_batch - head_entity_vector_before_batch - relation_vector_before_batch)
temp_negative = 2 * self.learning_rate * (
tail_entity_vector_with_corrupted_triplets_before_batch - head_entity_vector_with_corrupted_triplets_before_batch - relation_vector_before_batch)
if self.normal_form == "L1":
temp_positive_L1 = [1 if temp_positive[i] >= 0 else -1 for i in range(self.dim)]
temp_negative_L1 = [1 if temp_negative[i] >= 0 else -1 for i in range(self.dim)]
temp_positive_L1 = [float(f) for f in temp_positive_L1]
temp_negative_L1 = [float(f) for f in temp_negative_L1]
temp_positive = np.array(temp_positive_L1) * self.learning_rate
temp_negative = np.array(temp_negative_L1) * self.learning_rate
# temp_positive = norm(temp_positive_L1) * self.learning_rate
# temp_negative = norm(temp_negative_L1) * self.learning_rate
# 对损失函数的5个参数进行梯度下降, 随机体现在sample函数上
head_entity_vector += temp_positive
tail_entity_vector -= temp_positive
relation_vector = relation_vector + temp_positive - temp_negative
head_entity_vector_with_corrupted_triplets -= temp_negative
tail_entity_vector_with_corrupted_triplets += temp_negative
# 归一化刚才更新的向量,减少计算时间
entity_vector_copy[triplets_with_corrupted_triplets[0][0]] = norm(head_entity_vector)
entity_vector_copy[triplets_with_corrupted_triplets[0][1]] = norm(tail_entity_vector)
rels_vector_copy[triplets_with_corrupted_triplets[0][2]] = norm(relation_vector)
entity_vector_copy[triplets_with_corrupted_triplets[1][0]] = norm(
head_entity_vector_with_corrupted_triplets)
entity_vector_copy[triplets_with_corrupted_triplets[1][1]] = norm(
tail_entity_vector_with_corrupted_triplets)
# self.entity_vector_dict = deepcopy(entity_vector_copy)
# self.rels_vector_dict = deepcopy(rels_vector_copy)
self.entity_vector_dict = entity_vector_copy
self.rels_vector_dict = rels_vector_copy
def write_vector(self, file_path, option):
if option.strip().startswith("entit"):
print("Write entities vetor into file : {}".format(file_path))
# dyct = deepcopy(self.entity_vector_dict)
dyct = self.entity_vector_dict
if option.strip().startswith("rel"):
print("Write relationships vector into file: {}".format(file_path))
# dyct = deepcopy(self.rels_vector_dict)
dyct = self.rels_vector_dict
with open(file_path, 'w') as file: # 写文件,每次覆盖写 用with自动调用close
for dyct_key in dyct.keys():
file.write(dyct_key + "\t")
file.write(str(dyct[dyct_key].tolist()))
file.write("\n")
def write_loss(self, file_path, num_of_col):
with open(file_path, 'w') as file:
lyst = deepcopy(self.loss_list)
for i in range(len(lyst)):
if num_of_col == 1:
# 保留4位小数
file.write(str(int(lyst[i] * 10000) / 10000) + "\n")
# file.write(str(lyst[i]).split('.')[0] + '.' + str(lyst[i]).split('.')[1][:4] + "\n")
else:
# file.write(str(lyst[i]).split('.')[0] + '.' + str(lyst[i]).split('.')[1][:4] + "\t")
file.write(str(int(lyst[i] * 10000) / 10000) + " ")
if (i + 1) % num_of_col == 0 and i != 0:
file.write("\n")
class MyManager(BaseManager):
pass
def Manager2():
m = MyManager()
m.start()
return m
MyManager.register('TransE', TransE)
def func1(em, lock, num):
with lock:
em.transE(num=num)
if __name__ == "__main__":
entity_file_path = "data/FB15k/entity2id.txt"
num_of_entity, entity_list = get_details_of_entityOrRels_list(entity_file_path)
rels_file_path = "data/FB15k/relation2id.txt"
num_of_rels, rels_list = get_details_of_entityOrRels_list(rels_file_path)
train_file_path = "data/FB15k/train.txt"
num_of_triplets, triplets_list = get_details_of_triplets_list(train_file_path)
manager = Manager2()
transE = manager.TransE(entity_list, rels_list, triplets_list, margin=1, dim=50)
print("\nTransE is initializing...")
transE.initialize()
print("\n********** Start TransE training **********")
for i in range(20000): # epoch的次数
start_time = time.time()
lock = Lock()
proces = [Process(target=func1, args=(transE, lock, 1500)) for j in range(10)]
for p in proces:
p.start()
for p in proces:
p.join()
print("The loss is %.4f" % transE.get_loss())
transE.clear_loss()
end_time = time.time()
print("The %d epoch(10 batches) takes %.2f ms.\n" % (i, (end_time - start_time) * 1000))
# transE.transE(100000)
print("********** End TransE training ***********\n")
# 训练的批次并不一定是100的整数倍,将最后更新的向量写到文件
transE.write_vector("data/entityVector.txt", "entity")
transE.write_vector("data/relationVector.txt", "relationship")
transE.write_loss("data/lossList_25cols.txt", 25)
transE.write_loss("data/lossList_1cols.txt", 1)
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册