# 分布式 RPC 框架入门 > 原文: **作者**:[Shen Li](https://mrshenli.github.io/) 先决条件: * [PyTorch 分布式概述](../beginner/dist_overview.html) * [RPC API 文档](https://pytorch.org/docs/master/rpc.html) 本教程使用两个简单的示例来演示如何使用[`torch.distributed.rpc`](https://pytorch.org/docs/master/rpc.html)包构建分布式训练,该包首先在 PyTorch v1.4 中作为原型功能引入。 这两个示例的源代码可以在 [PyTorch 示例](https://github.com/pytorch/examples)中找到。 先前的教程[分布式数据并行入门](ddp_tutorial.html)和[使用 PyTorch](dist_tuto.html) 编写分布式应用,描述了[`DistributedDataParallel`](https://pytorch.org/docs/stable/_modules/torch/nn/parallel/distributed.html),该模型支持特定的训练范例,该模型可在多个进程之间复制,每个进程都处理输入数据的拆分。 有时,您可能会遇到需要不同训练范例的场景。 例如: 1. 在强化学习中,从环境中获取训练数据可能相对昂贵,而模型本身可能很小。 在这种情况下,产生多个并行运行的观察者并共享一个智能体可能会很有用。 在这种情况下,智能体将在本地负责训练,但是应用仍将需要库在观察者和训练者之间发送和接收数据。 2. 您的模型可能太大,无法容纳在一台计算机上的 GPU 中,因此需要一个库来帮助将模型拆分到多台计算机上。 或者,您可能正在实现[参数服务器](https://www.cs.cmu.edu/~muli/file/parameter_server_osdi14.pdf)训练框架,其中模型参数和训练器位于不同的机器上。 [`torch.distributed.rpc`](https://pytorch.org/docs/master/rpc.html)包可以帮助解决上述情况。 在情况 1 中, [RPC](https://pytorch.org/docs/master/rpc.html#rpc) 和 [RRef](https://pytorch.org/docs/master/rpc.html#rref) 允许将数据从一个工作程序发送到另一个工作程序,同时轻松引用远程数据对象。 在情况 2 中,[分布式 Autograd](https://pytorch.org/docs/master/rpc.html#distributed-autograd-framework) 和[分布式优化器](https://pytorch.org/docs/master/rpc.html#module-torch.distributed.optim)使执行反向传递和优化器步骤就像本地训练一样。 在接下来的两节中,我们将使用强化学习示例和语言模型示例来演示[`torch.distributed.rpc`](https://pytorch.org/docs/master/rpc.html)的 API。 请注意,本教程并非旨在构建最准确或最有效的模型来解决给定的问题,相反,此处的主要目标是演示如何使用[`torch.distributed.rpc`](https://pytorch.org/docs/master/rpc.html)包来构建分布式训练应用。 ## 使用 RPC 和 RRef 的分布式强化学习 本节介绍了使用 RPC 建立玩具分布式强化学习模型以解决 [OpenAI Gym](https://gym.openai.com) 中的 CartPole-v1 的步骤。 策略代码主要是从现有的单线程[示例](https://github.com/pytorch/examples/blob/master/reinforcement_learning)中借用的,如下所示。 我们将跳过`Policy`设计的详细信息,并将重点介绍 RPC 的用法。 ```py import torch.nn as nn import torch.nn.functional as F class Policy(nn.Module): def __init__(self): super(Policy, self).__init__() self.affine1 = nn.Linear(4, 128) self.dropout = nn.Dropout(p=0.6) self.affine2 = nn.Linear(128, 2) self.saved_log_probs = [] self.rewards = [] def forward(self, x): x = self.affine1(x) x = self.dropout(x) x = F.relu(x) action_scores = self.affine2(x) return F.softmax(action_scores, dim=1) ``` 首先,让我们准备一个助手,以在`RRef`的所有者工作程序上远程运行函数。 您将在本教程的示例中的多个地方发现该函数。 理想情况下,`torch.distributed.rpc`包应立即提供这些助手函数。 例如,如果应用可以直接调用`RRef.some_func(*arg)`,然后将其转换为`RRef`所有者的 RPC,将会更容易。 在[`pytorch/pytorch#31743`](https://github.com/pytorch/pytorch/issues/31743)中跟踪了此 API 的进度。 ```py from torch.distributed.rpc import rpc_sync def _call_method(method, rref, *args, **kwargs): return method(rref.local_value(), *args, **kwargs) def _remote_method(method, rref, *args, **kwargs): args = [method, rref] + list(args) return rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs) # to call a function on an rref, we could do the following # _remote_method(some_func, rref, *args) ``` 我们准备介绍观察员。 在此示例中,每个观察者创建自己的环境,并等待智能体的命令来运行剧集。 在每个剧集中,一个观察者最多循环`n_steps`个迭代,并且在每个迭代中,它使用 RPC 将其环境状态传递给智能体并取回操作。 然后,它将该操作应用于其环境,并从环境中获取奖励和下一个状态。 之后,观察者使用另一个 RPC 向智能体报告奖励。 同样,请注意,这显然不是最有效的观察者实现。 例如,一个简单的优化可能是将当前状态和最后的报酬打包到一个 RPC 中,以减少通信开销。 但是,目标是演示 RPC API,而不是为 CartPole 构建最佳的求解器。 因此,在此示例中,让逻辑保持简单,并明确两个步骤。 ```py import argparse import gym import torch.distributed.rpc as rpc parser = argparse.ArgumentParser( description="RPC Reinforcement Learning Example", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument('--world_size', default=2, help='Number of workers') parser.add_argument('--log_interval', default=1, help='Log every log_interval episodes') parser.add_argument('--gamma', default=0.1, help='how much to value future rewards') parser.add_argument('--seed', default=1, help='random seed for reproducibility') args = parser.parse_args() class Observer: def __init__(self): self.id = rpc.get_worker_info().id self.env = gym.make('CartPole-v1') self.env.seed(args.seed) def run_episode(self, agent_rref, n_steps): state, ep_reward = self.env.reset(), 0 for step in range(n_steps): # send the state to the agent to get an action action = _remote_method(Agent.select_action, agent_rref, self.id, state) # apply the action to the environment, and get the reward state, reward, done, _ = self.env.step(action) # report the reward to the agent for training purpose _remote_method(Agent.report_reward, agent_rref, self.id, reward) if done: break ``` agent 的代码稍微复杂一点,我们将其分成多个部分。 在此示例中,智能体既充当训练者又充当主角色,以便它向多个分布式观察者发送命令以运行剧集,并且还记录本地的所有动作和奖励,这些动作和奖赏将在每个剧集之后的训练阶段使用。 下面的代码显示了`Agent`构造器,其中大多数行都在初始化各种组件。 最后的循环在其他工作器上远程初始化观察者,并在本地将`RRefs`保留给这些观察者。 智能体稍后将使用那些观察者`RRefs`发送命令。 应用无需担心`RRefs`的寿命。 每个`RRef`的所有者维护一个引用计数图以跟踪其生命周期,并保证只要该`RRef`的任何活动用户都不会删除远程数据对象。 有关详细信息,请参考`RRef` [设计文档](https://pytorch.org/docs/master/notes/rref.html)。 ```py import gym import numpy as np import torch import torch.distributed.rpc as rpc import torch.optim as optim from torch.distributed.rpc import RRef, rpc_async, remote from torch.distributions import Categorical class Agent: def __init__(self, world_size): self.ob_rrefs = [] self.agent_rref = RRef(self) self.rewards = {} self.saved_log_probs = {} self.policy = Policy() self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2) self.eps = np.finfo(np.float32).eps.item() self.running_reward = 0 self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold for ob_rank in range(1, world_size): ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank)) self.ob_rrefs.append(remote(ob_info, Observer)) self.rewards[ob_info.id] = [] self.saved_log_probs[ob_info.id] = [] ``` 接下来,智能体向观察者公开两个 API,以供他们选择动作和报告奖励。 这些函数仅在智能体上本地运行,但是将由观察者通过 RPC 触发。 ```py class Agent: ... def select_action(self, ob_id, state): state = torch.from_numpy(state).float().unsqueeze(0) probs = self.policy(state) m = Categorical(probs) action = m.sample() self.saved_log_probs[ob_id].append(m.log_prob(action)) return action.item() def report_reward(self, ob_id, reward): self.rewards[ob_id].append(reward) ``` 让我们在智能体上添加`run_episode`函数,该函数告诉所有观察者执行片段。 在此函数中,它首先创建一个列表,以从异步 RPC 收集期货,然后在所有观察者`RRefs`上循环以生成异步 RPC。 在这些 RPC 中,智能体还将自身的`RRef`传递给观察者,以便观察者也可以在智能体上调用函数。 如上所示,每个观察者都将 RPC 返回给智能体,它们是嵌套的 RPC。 在每个剧集之后,`saved_log_probs`和`rewards`将包含记录的动作概率和奖励。 ```py class Agent: ... def run_episode(self, n_steps=0): futs = [] for ob_rref in self.ob_rrefs: # make async RPC to kick off an episode on all observers futs.append( rpc_async( ob_rref.owner(), _call_method, args=(Observer.run_episode, ob_rref, self.agent_rref, n_steps) ) ) # wait until all obervers have finished this episode for fut in futs: fut.wait() ``` 最后,在一集之后,智能体需要训练模型,该模型在下面的`finish_episode`函数中实现。 此函数中没有 RPC,并且大多数是从单线程[示例](https://github.com/pytorch/examples/blob/master/reinforcement_learning)中借用的。 因此,我们跳过描述其内容。 ```py class Agent: ... def finish_episode(self): # joins probs and rewards from different observers into lists R, probs, rewards = 0, [], [] for ob_id in self.rewards: probs.extend(self.saved_log_probs[ob_id]) rewards.extend(self.rewards[ob_id]) # use the minimum observer reward to calculate the running reward min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards]) self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward # clear saved probs and rewards for ob_id in self.rewards: self.rewards[ob_id] = [] self.saved_log_probs[ob_id] = [] policy_loss, returns = [], [] for r in rewards[::-1]: R = r + args.gamma * R returns.insert(0, R) returns = torch.tensor(returns) returns = (returns - returns.mean()) / (returns.std() + self.eps) for log_prob, R in zip(probs, returns): policy_loss.append(-log_prob * R) self.optimizer.zero_grad() policy_loss = torch.cat(policy_loss).sum() policy_loss.backward() self.optimizer.step() return min_reward ``` 使用`Policy`,`Observer`和`Agent`类,我们准备启动多个过程来执行分布式训练。 在此示例中,所有进程都运行相同的`run_worker`函数,并且它们使用等级来区分其角色。 等级 0 始终是智能体,其他所有等级都是观察者。 智能体通过重复调用`run_episode`和`finish_episode`作为主设备,直到运行的奖励超过环境指定的奖励阈值为止。 所有观察者都被动地等待来自智能体的命令。 该代码由[`rpc.init_rpc`](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.init_rpc)和[`rpc.shutdown`](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.shutdown)包装,它们分别初始化和终止 RPC 实例。 [API 页面](https://pytorch.org/docs/master/rpc.html)中提供了更多详细信息。 ```py import os from itertools import count import torch.multiprocessing as mp AGENT_NAME = "agent" OBSERVER_NAME="obs" TOTAL_EPISODE_STEP = 100 def run_worker(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '29500' if rank == 0: # rank0 is the agent rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size) agent = Agent(world_size) for i_episode in count(1): n_steps = int(TOTAL_EPISODE_STEP / (args.world_size - 1)) agent.run_episode(n_steps=n_steps) last_reward = agent.finish_episode() if i_episode % args.log_interval == 0: print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format( i_episode, last_reward, agent.running_reward)) if agent.running_reward > agent.reward_threshold: print("Solved! Running reward is now {}!".format(agent.running_reward)) break else: # other ranks are the observer rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size) # observers passively waiting for instructions from the agent # block until all rpcs finish, and shutdown the RPC instance rpc.shutdown() mp.spawn( run_worker, args=(args.world_size, ), nprocs=args.world_size, join=True ) ``` 以下是使用`world_size = 2`进行训练时的一些示例输出。 ```py Episode 10 Last reward: 26.00 Average reward: 10.01 Episode 20 Last reward: 16.00 Average reward: 11.27 Episode 30 Last reward: 49.00 Average reward: 18.62 Episode 40 Last reward: 45.00 Average reward: 26.09 Episode 50 Last reward: 44.00 Average reward: 30.03 Episode 60 Last reward: 111.00 Average reward: 42.23 Episode 70 Last reward: 131.00 Average reward: 70.11 Episode 80 Last reward: 87.00 Average reward: 76.51 Episode 90 Last reward: 86.00 Average reward: 95.93 Episode 100 Last reward: 13.00 Average reward: 123.93 Episode 110 Last reward: 33.00 Average reward: 91.39 Episode 120 Last reward: 73.00 Average reward: 76.38 Episode 130 Last reward: 137.00 Average reward: 88.08 Episode 140 Last reward: 89.00 Average reward: 104.96 Episode 150 Last reward: 97.00 Average reward: 98.74 Episode 160 Last reward: 150.00 Average reward: 100.87 Episode 170 Last reward: 126.00 Average reward: 104.38 Episode 180 Last reward: 500.00 Average reward: 213.74 Episode 190 Last reward: 322.00 Average reward: 300.22 Episode 200 Last reward: 165.00 Average reward: 272.71 Episode 210 Last reward: 168.00 Average reward: 233.11 Episode 220 Last reward: 184.00 Average reward: 195.02 Episode 230 Last reward: 284.00 Average reward: 208.32 Episode 240 Last reward: 395.00 Average reward: 247.37 Episode 250 Last reward: 500.00 Average reward: 335.42 Episode 260 Last reward: 500.00 Average reward: 386.30 Episode 270 Last reward: 500.00 Average reward: 405.29 Episode 280 Last reward: 500.00 Average reward: 443.29 Episode 290 Last reward: 500.00 Average reward: 464.65 Solved! Running reward is now 475.3163778435275! ``` 在此示例中,我们展示了如何使用 RPC 作为通信工具来跨工作器传递数据,以及如何使用 RRef 引用远程对象。 的确,您可以直接在`ProcessGroup` `send`和`recv` API 之上构建整个结构,也可以使用其他通信/ RPC 库。 但是,通过使用`torch.distributed.rpc`,您可以在后台获得本机支持并不断优化性能。 接下来,我们将展示如何将 RPC 和 RRef 与分布式 Autograd 和分布式优化器结合起来执行分布式模型并行训练。 ## 使用分布式 Autograd 和分布式优化器的分布式 RNN 在本节中,我们将使用 RNN 模型来展示如何使用 RPC API 构建分布式模型并行训练。 示例 RNN 模型非常小,可以轻松地放入单个 GPU 中,但是我们仍将其层划分为两个不同的工作器来演示这一想法。 开发人员可以应用类似的技术在多个设备和机器上分布更大的模型。 RNN 模型设计是从 PyTorch [示例](https://github.com/pytorch/examples/tree/master/word_language_model)存储库中的词语言模型中借用的,该存储库包含三个主要组件,一个嵌入表,一个`LSTM`层和一个解码器。 下面的代码将嵌入表和解码器包装到子模块中,以便它们的构造器可以传递给 RPC API。 在`EmbeddingTable`子模块中,我们有意将`Embedding`层放在 GPU 上以涵盖用例。 在 v1.4 中,RPC 始终在目标工作线程上创建 CPU 张量参数或返回值。 如果函数使用 GPU 张量,则需要将其显式移动到适当的设备。 ```py class EmbeddingTable(nn.Module): r""" Encoding layers of the RNNModel """ def __init__(self, ntoken, ninp, dropout): super(EmbeddingTable, self).__init__() self.drop = nn.Dropout(dropout) self.encoder = nn.Embedding(ntoken, ninp).cuda() self.encoder.weight.data.uniform_(-0.1, 0.1) def forward(self, input): return self.drop(self.encoder(input.cuda()).cpu() class Decoder(nn.Module): def __init__(self, ntoken, nhid, dropout): super(Decoder, self).__init__() self.drop = nn.Dropout(dropout) self.decoder = nn.Linear(nhid, ntoken) self.decoder.bias.data.zero_() self.decoder.weight.data.uniform_(-0.1, 0.1) def forward(self, output): return self.decoder(self.drop(output)) ``` 使用上述子模块,我们现在可以使用 RPC 将它们组合在一起以创建 RNN 模型。 在下面的代码中,`ps`代表参数服务器,该服务器托管嵌入表和解码器的参数。 构造器使用[远程](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.remote) API 在参数服务器上创建`EmbeddingTable`对象和`Decoder`对象,并在本地创建`LSTM`子模块。 在前进过程中,训练器使用`EmbeddingTable` `RRef`查找远程子模块,然后使用 RPC 将输入数据传递到`EmbeddingTable`,并获取查找结果。 然后,它通过本地`LSTM`层运行嵌入,最后使用另一个 RPC 将输出发送到`Decoder`子模块。 通常,要实现分布式模型并行训练,开发人员可以将模型分为多个子模块,调用 RPC 远程创建子模块实例,并在必要时使用`RRef`查找它们。 正如您在下面的代码中看到的那样,它看起来与单机模型并行训练非常相似。 主要区别是用 RPC 函数替换了`Tensor.to(device)`。 ```py class RNNModel(nn.Module): def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5): super(RNNModel, self).__init__() # setup embedding table remotely self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout)) # setup LSTM locally self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout) # setup decoder remotely self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout)) def forward(self, input, hidden): # pass input to the remote embedding table and fetch emb tensor back emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input) output, hidden = self.rnn(emb, hidden) # pass output to the rremote decoder and get the decoded output back decoded = _remote_method(Decoder.forward, self.decoder_rref, output) return decoded, hidden ``` 在介绍分布式优化器之前,让我们添加一个辅助函数来生成模型参数的 RRef 列表,该列表将由分布式优化器使用。 在本地训练中,应用可以调用`Module.parameters()`来获取对所有参数张量的引用,并将其传递给本地优化器以进行后续更新。 但是,由于某些参数存在于远程计算机上,因此同一 API 在分布式训练方案中不起作用。 因此,分布式优化器不采用参数`Tensors`的列表,而是采用`RRefs`的列表,每个模型参数一个`RRef`用于本地和远程模型参数。 辅助函数非常简单,只需调用`Module.parameters()`并在每个参数上创建一个本地`RRef`。 ```py def _parameter_rrefs(module): param_rrefs = [] for param in module.parameters(): param_rrefs.append(RRef(param)) return param_rrefs ``` 然后,由于`RNNModel`包含三个子模块,因此我们需要调用`_parameter_rrefs` 3 次,并将其包装到另一个辅助函数中。 ```py class RNNModel(nn.Module): ... def parameter_rrefs(self): remote_params = [] # get RRefs of embedding table remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref)) # create RRefs for local parameters remote_params.extend(_parameter_rrefs(self.rnn)) # get RRefs of decoder remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref)) return remote_params ``` 现在,我们准备实现训练循环。 初始化模型参数后,我们创建`RNNModel`和`DistributedOptimizer`。 分布式优化器将采用参数`RRefs`的列表,查找所有不同的所有者工作器,并在每个所有者工作器上创建给定的本地优化器(即,在这种情况下,您也可以使用其他本地优化器`SGD`) 使用给定的参数(即`lr=0.05`)。 在训练循环中,它首先创建一个分布式 Autograd 上下文,这将帮助分布式 Autograd 引擎查找梯度和涉及的 RPC 发送/接收函数。 分布式 Autograd 引擎的设计详细信息可以在其[设计说明](https://pytorch.org/docs/master/notes/distributed_autograd.html)中找到。 然后,它像本地模型一样开始正向传播,并运行分布式后向传递。 对于后向分布,您只需要指定一个根列表,在这种情况下,就是损失`Tensor`。 分布式 Autograd 引擎将自动遍历分布式图并正确编写梯度。 接下来,它在分布式优化器上运行`step`函数,该函数将与所有涉及的本地优化器联系以更新模型参数。 与本地训练相比,一个较小的差异是您不需要运行`zero_grad()`,因为每个 Autograd 上下文都有专用的空间来存储梯度,并且在每次迭代创建上下文时,来自不同迭代的那些梯度不会累积到同一组`Tensors`。 ```py def run_trainer(): batch = 5 ntoken = 10 ninp = 2 nhid = 3 nindices = 3 nlayers = 4 hidden = ( torch.randn(nlayers, nindices, nhid), torch.randn(nlayers, nindices, nhid) ) model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers) # setup distributed optimizer opt = DistributedOptimizer( optim.SGD, model.parameter_rrefs(), lr=0.05, ) criterion = torch.nn.CrossEntropyLoss() def get_next_batch(): for _ in range(5): data = torch.LongTensor(batch, nindices) % ntoken target = torch.LongTensor(batch, ntoken) % nindices yield data, target # train for 10 iterations for epoch in range(10): for data, target in get_next_batch(): # create distributed autograd context with dist_autograd.context() as context_id: hidden[0].detach_() hidden[1].detach_() output, hidden = model(data, hidden) loss = criterion(output, target) # run distributed backward pass dist_autograd.backward(context_id, [loss]) # run distributed optimizer opt.step(context_id) # not necessary to zero grads since they are # accumulated into the distributed autograd context # which is reset every iteration. print("Training epoch {}".format(epoch)) ``` 最后,让我们添加一些粘合代码以启动参数服务器和训练器流程。 ```py def run_worker(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '29500' if rank == 1: rpc.init_rpc("trainer", rank=rank, world_size=world_size) _run_trainer() else: rpc.init_rpc("ps", rank=rank, world_size=world_size) # parameter server do nothing pass # block until all rpcs finish rpc.shutdown() if __name__=="__main__": world_size = 2 mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True) ```