nccl_context.cc 8.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/imperative/nccl_context.h"
16

17
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
18 19 20 21
#include "paddle/fluid/imperative/all_reduce.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h"
#endif
22

K
kuizhiqing 已提交
23 24 25 26 27 28 29
#ifdef PADDLE_WITH_NCCL
#include <nccl.h>
#include "paddle/fluid/platform/dynload/nccl.h"
#endif

#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
R
ronnywang 已提交
30
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
31 32
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
33 34 35 36 37 38

namespace paddle {
namespace framework {
class Variable;
}  // namespace framework
}  // namespace paddle
39

40 41
namespace paddle {
namespace imperative {
42
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
43

44 45
void NCCLParallelContext::BcastNCCLId(
    std::vector<ncclUniqueId> &nccl_ids,  // NOLINT
46
    int root, int server_fd) {
47
  if (strategy_.local_rank_ == root) {
48 49 50 51 52
    std::vector<std::string> other_trainers;
    for (auto &ep : strategy_.trainer_endpoints_) {
      if (ep != strategy_.current_endpoint_) {
        other_trainers.push_back(ep);
      }
53
    }
54
    platform::SendBroadCastCommID(other_trainers, &nccl_ids);
55
  } else {
56 57
    platform::RecvBroadCastCommID(server_fd, strategy_.current_endpoint_,
                                  &nccl_ids);
58 59 60 61
  }
}

void NCCLParallelContext::Init() {
62 63
  int server_fd = -1;

64 65
  std::vector<ncclUniqueId> nccl_ids;
  nccl_ids.resize(strategy_.nrings_);
66

67 68 69 70
  if (strategy_.local_rank_ == 0) {
    // generate the unique ncclid on the root worker
    for (size_t i = 0; i < nccl_ids.size(); ++i) {
      platform::dynload::ncclGetUniqueId(&nccl_ids[i]);
71
    }
72 73 74 75 76
  } else {
    // FIXME(wangxi): gloo will use rank0 endpoint, so not create socket server
    // on rank0.
    server_fd = platform::SocketServer::GetInstance(strategy_.current_endpoint_)
                    .socket();
77
  }
78
  BcastNCCLId(nccl_ids, 0, server_fd);
79 80 81

  int gpu_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device;
  for (int ring_id = 0; ring_id < strategy_.nrings_; ring_id++) {
82 83 84 85
    VLOG(0) << "init nccl context nranks: " << strategy_.nranks_
            << " local rank: " << strategy_.local_rank_ << " gpu id: " << gpu_id
            << " ring id: " << ring_id;
    // it will assign nccl_comm in CUDADeviceContext within ring_id
86
    platform::NCCLCommContext::Instance().CreateComm(
87 88
        &nccl_ids[ring_id], strategy_.nranks_, strategy_.local_rank_, gpu_id,
        ring_id);
89 90 91 92 93 94

    compute_events_.emplace_back(
        platform::CudaEventResourcePool::Instance().New(
            BOOST_GET_CONST(platform::CUDAPlace, place_).device));
    comm_events_.emplace_back(platform::CudaEventResourcePool::Instance().New(
        BOOST_GET_CONST(platform::CUDAPlace, place_).device));
95 96 97
  }
}

K
kuizhiqing 已提交
98
void NCCLParallelContext::InitWithRingID(int ring_id) {
99
  int server_fd = -1;
K
kuizhiqing 已提交
100 101 102 103 104 105
  std::vector<ncclUniqueId> nccl_ids;
  nccl_ids.resize(1);

  if (strategy_.local_rank_ == 0) {
    // generate the unique ncclid on the root worker
    platform::dynload::ncclGetUniqueId(&nccl_ids[0]);
106 107 108 109 110
  } else {
    // FIXME(wangxi): gloo will use rank0 endpoint, so not create socket server
    // on rank0.
    server_fd = platform::SocketServer::GetInstance(strategy_.current_endpoint_)
                    .socket();
K
kuizhiqing 已提交
111
  }
112
  BcastNCCLId(nccl_ids, 0, server_fd);
K
kuizhiqing 已提交
113 114 115 116 117 118

  int gpu_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device;
  VLOG(0) << "init nccl context nranks: " << strategy_.nranks_
          << " local rank: " << strategy_.local_rank_ << " gpu id: " << gpu_id
          << " ring id: " << ring_id;
  // it will assign nccl_comm in CUDADeviceContext within ring_id
119
  platform::NCCLCommContext::Instance().CreateComm(
K
kuizhiqing 已提交
120 121 122 123 124 125 126 127
      &nccl_ids[0], strategy_.nranks_, strategy_.local_rank_, gpu_id, ring_id);

  compute_events_.emplace_back(platform::CudaEventResourcePool::Instance().New(
      BOOST_GET_CONST(platform::CUDAPlace, place_).device));
  comm_events_.emplace_back(platform::CudaEventResourcePool::Instance().New(
      BOOST_GET_CONST(platform::CUDAPlace, place_).device));
}

128 129 130 131 132 133 134
void NCCLParallelContext::AllReduceByStream(const framework::Variable &src,
                                            framework::Variable *dst,
                                            int ring_id, bool use_calc_stream) {
  PADDLE_ENFORCE_EQ(
      platform::is_gpu_place(place_), true,
      platform::errors::Unimplemented(
          "Dynamic graph mode does not support multi-CPU training yet."));
135
  AllReduce(src, dst, strategy_, ring_id, use_calc_stream);
136
}
137

K
kuizhiqing 已提交
138 139 140 141 142 143 144 145 146 147
void NCCLParallelContext::Broadcast(framework::Variable *src, int ring_id) {
  VLOG(3) << "/// DEBUG /// start inter broadcast with ring_id: " << ring_id;
  framework::Tensor *src_tensor = src->GetMutable<framework::LoDTensor>();
  const auto &place = src_tensor->place();
  platform::NCCLComm *comm =
      platform::NCCLCommContext::Instance().Get(ring_id, place);
  gpuStream_t stream = comm->stream();

  void *src_ptr = src_tensor->data<void>();
  auto nccl_dtype = platform::ToNCCLDataType(src_tensor->type());
R
ronnywang 已提交
148
  PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclBcast(
K
kuizhiqing 已提交
149 150 151
      src_ptr, src_tensor->numel(), nccl_dtype, 0, comm->comm(), stream));
}

152
paddle::platform::DeviceContext *NCCLParallelContext::GetDeviceContext(
153
    int ring_id) {
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
  return static_cast<platform::DeviceContext *>(
      platform::NCCLCommContext::Instance()
          .Get(ring_id, place_)
          ->dev_context());
}

void NCCLParallelContext::WaitCompute(int ring_id) {
  PADDLE_ENFORCE_GE(ring_id, 0, platform::errors::OutOfRange(
                                    "ring id must >= 0, but got %d", ring_id));
  PADDLE_ENFORCE_LT(ring_id, compute_events_.size(),
                    platform::errors::OutOfRange(
                        "ring id must < compute events size,"
                        "but got ring id = %d, compute events size = %d",
                        ring_id, compute_events_.size()));

  auto compute_stream = static_cast<platform::CUDADeviceContext *>(
                            platform::DeviceContextPool::Instance().Get(place_))
                            ->stream();
  auto comm_stream =
      platform::NCCLCommContext::Instance().Get(ring_id, place_)->stream();
  auto event = compute_events_[ring_id].get();

176 177
// compute_stream-->event-->comm_stream
#ifdef PADDLE_WITH_HIP
178 179
  PADDLE_ENFORCE_GPU_SUCCESS(hipEventRecord(event, compute_stream));
  PADDLE_ENFORCE_GPU_SUCCESS(hipStreamWaitEvent(comm_stream, event, 0));
180
#else
181 182
  PADDLE_ENFORCE_GPU_SUCCESS(cudaEventRecord(event, compute_stream));
  PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamWaitEvent(comm_stream, event, 0));
183
#endif
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
}

void NCCLParallelContext::WaitComm(int ring_id) {
  PADDLE_ENFORCE_GE(ring_id, 0, platform::errors::OutOfRange(
                                    "ring id must >= 0, but got %d", ring_id));
  PADDLE_ENFORCE_LT(ring_id, comm_events_.size(),
                    platform::errors::OutOfRange(
                        "ring id must < comm events size,"
                        "but got ring id = %d, comm events size = %d",
                        ring_id, comm_events_.size()));

  auto compute_stream = static_cast<platform::CUDADeviceContext *>(
                            platform::DeviceContextPool::Instance().Get(place_))
                            ->stream();
  auto comm_stream =
      platform::NCCLCommContext::Instance().Get(ring_id, place_)->stream();
  auto event = comm_events_[ring_id].get();

202 203
// comm_stream-->event-->compute_stream
#ifdef PADDLE_WITH_HIP
204 205
  PADDLE_ENFORCE_GPU_SUCCESS(hipEventRecord(event, comm_stream));
  PADDLE_ENFORCE_GPU_SUCCESS(hipStreamWaitEvent(compute_stream, event, 0));
206
#else
207 208
  PADDLE_ENFORCE_GPU_SUCCESS(cudaEventRecord(event, comm_stream));
  PADDLE_ENFORCE_GPU_SUCCESS(cudaStreamWaitEvent(compute_stream, event, 0));
209
#endif
210
}
211

212 213 214 215 216 217
void NCCLParallelContext::SynchronizeCompute() {
  auto *compute_dev_ctx = static_cast<platform::CUDADeviceContext *>(
      platform::DeviceContextPool::Instance().Get(place_));
  compute_dev_ctx->Wait();
}

218 219 220 221
#endif

}  //  namespace imperative
}  //  namespace paddle