Fix NCCL/Gloo process groups and DDP stream sync bug (#18465)
authorShen Li <shenli@fb.com>
Thu, 28 Mar 2019 22:05:53 +0000 (15:05 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 28 Mar 2019 22:12:40 +0000 (15:12 -0700)
Summary:
DDP with NCCL backend uses a [worker stream](https://github.com/pytorch/pytorch/blob/d3eb941ed96774efb8d89a0b20c9e49807ea85a7/torch/csrc/distributed/c10d/ddp.cpp#L142) to flatten grand batch
tensors, and passes the flattened tensor to [another stream](https://github.com/pytorch/pytorch/blob/d3eb941ed96774efb8d89a0b20c9e49807ea85a7/torch/lib/c10d/ProcessGroupNCCL.cpp#L379) to
conduct ncclAllReduce. The flattened tensor has to record the
ncclAllReduce stream, otherwise multiple streams might access the
same memory space.

cc ppwwyyxx
Pull Request resolved: https://github.com/pytorch/pytorch/pull/18465

Differential Revision: D14613449

Pulled By: mrshenli

fbshipit-source-id: b62773732552d12cc87b7adeb6897e9e11753ea9

torch/csrc/distributed/c10d/ddp.cpp
torch/lib/c10d/ProcessGroupGloo.cpp
torch/lib/c10d/ProcessGroupNCCL.cpp

index 6d33686..7673431 100644 (file)
@@ -145,8 +145,18 @@ std::tuple<std::shared_ptr<ProcessGroup::Work>, at::Tensor> queueReduction(
     events[devIdx].record();
     workerStreams.push_back(
         at::cuda::getStreamFromPool(false, devices[devIdx]));
-    // Let the worker stream to wait for the default stream
+    // Let worker streams to wait for default streams to make sure worker
+    // streams do not touch `gradsBatch` until all pending ops to create
+    // `gradBatch` finish.
     events[devIdx].block(workerStreams.back());
+
+    // Input `gradsBatch` are created on current streams and used in worker
+    // streams. Hence, they must record worker streams to prevent being
+    // freed before their worker stream ops finish.
+    for (at::Tensor& grad : gradsBatch[devIdx]) {
+      c10::cuda::CUDACachingAllocator::recordStream(
+        grad.storage().data(), workerStreams.back());
+    }
   }
 
   // Stream guards, now the current stream is the worker stream
@@ -179,6 +189,15 @@ void syncReduction(
   // and intra-node reduce to be operated on this worker stream to
   // improve performance
   at::cuda::CUDAStream workerStream = at::cuda::getStreamFromPool();
+
+  // Input `gradsBatch` are created on the current stream and used on the worker
+  // stream. Hence, they must record worker streams to prevent being freed
+  // before their worker stream ops finish.
+  for (at::Tensor& grad : gradsBatch) {
+    c10::cuda::CUDACachingAllocator::recordStream(
+      grad.storage().data(), workerStream);
+  }
+
   at::cuda::CUDAStreamGuard cudaGuard(workerStream);
 
   // Let the worker stream wait on the reduction stream
index 140c85e..d5c674c 100644 (file)
 
 #ifdef USE_CUDA
 #include <ATen/cuda/CUDAEvent.h>
-#include <c10/cuda/CUDAGuard.h>
-#include <c10/cuda/CUDAStream.h>
 #include <ATen/cuda/Exceptions.h>
 #include <ATen/cuda/PinnedMemoryAllocator.h>
+#include <c10/cuda/CUDACachingAllocator.h>
+#include <c10/cuda/CUDAGuard.h>
+#include <c10/cuda/CUDAStream.h>
 #endif
 
 #include <gloo/rendezvous/context.h>
@@ -150,6 +151,11 @@ void initializeStreamsEvents(
         /* isHighPriority */ true, tensors[i].device().index()));
     // Ensure the new stream is synchronized with the current stream.
     events[i].block(streams[i]);
+
+    // `tensors` are created on a different stream. Hence, they must record
+    // new streams in this Work to prevent being freed before the Work finishes.
+    c10::cuda::CUDACachingAllocator::recordStream(
+      tensors[i].storage().data(), streams[i]);
   }
 }
 
@@ -187,6 +193,14 @@ void initializeStreamsEvents(
         /* isHighPriority */ true, tensors[i][0].device().index()));
     // Ensure the new stream is synchronized with the current stream.
     events[i].block(streams[i]);
+
+    for (at::Tensor& tensor : tensors[i]) {
+      // `tensors` are created on a different stream. Hence, they must record
+      // new streams in this Work to prevent being freed before the Work
+      // finishes.
+      c10::cuda::CUDACachingAllocator::recordStream(
+        tensor.storage().data(), streams[i]);
+    }
   }
 }
 
index 5eedb89..486d2ce 100644 (file)
@@ -66,7 +66,19 @@ std::vector<at::Device> getDeviceList(const std::vector<at::Tensor>& tensors) {
   return res;
 }
 
-// Helper that lets the input ncclStreams to wait for the current stream
+// [Sync Streams] Helper that lets the input ncclStreams to wait for the current
+// stream. NCCL communications run on ncclStreams, but input tensors are
+// allocated on different streams (i.e., current streams). Communications on
+// ncclStreams cannot start before pending input tensor ops on current streams
+// finish. Otherwise, ops on two streams might read/write same tensors
+// concurrently.
+//
+// The synchronization above alone is not enough. We also need to make sure
+// input tensors are not freed before their usages on ncclStreams finish. This
+// can be achieved by calling c10::cuda::CUDACachingAllocator::recordStream,
+// which remembers the usage stream (ncclStream), creates an event on the usage
+// stream when GC attempts to free the input tensor, and delays GC until that
+// event is done.
 void syncStreams(
     const std::vector<at::Device>& devices,
     std::vector<at::cuda::CUDAEvent>& ncclEvents,
@@ -361,7 +373,7 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allreduce(
   auto key = getKeyFromDevices(devices);
   auto& ncclComms = getNCCLComm(key, devices);
 
-  // First let NCCL streams wait for THC stream
+  // First let NCCL streams wait for input tensors allocation streams
   syncStreams(devices, ncclEvents_[key], ncclStreams_[key]);
 
   // Work itself will create the CUDA events on all GPUs of tensors
@@ -378,6 +390,12 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allreduce(
     gpuGuard.set_index(devices[i].index());
     at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
 
+    // Input `tensors` are created on a worker stream and used in different
+    // ncclStream. Hence, `tensors` must record the ncclStream to prevent being
+    // freed before ncclAllReduce finishes. See [Sync Streams].
+    c10::cuda::CUDACachingAllocator::recordStream(
+      tensors[i].storage().data(), ncclStream);
+
     C10D_NCCL_CHECK(ncclAllReduce(
         tensors[i].data_ptr(),
         tensors[i].data_ptr(),
@@ -427,6 +445,12 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::broadcast(
     // root rank of the the GPU
     int root = opts.rootRank * tensors.size() + opts.rootTensor;
 
+    // Input `tensors` are created on worker streams and used in different
+    // ncclStreams. Hence, `tensors` must record ncclStreams to prevent being
+    // freed before ncclBcast finishes. See [Sync Streams].
+    c10::cuda::CUDACachingAllocator::recordStream(
+      tensors[i].storage().data(), ncclStream);
+
     C10D_NCCL_CHECK(ncclBcast(
         tensors[i].data_ptr(),
         tensors[i].numel(),
@@ -475,6 +499,12 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::reduce(
     // root rank of the the GPU
     int root = opts.rootRank * tensors.size() + opts.rootTensor;
 
+    // Input `tensors` are created on worker streams and used in different
+    // ncclStreams. Hence, `tensors` must record ncclStreams to prevent being
+    // freed before ncclReduce finishes. See [Sync Streams].
+    c10::cuda::CUDACachingAllocator::recordStream(
+      tensors[i].storage().data(), ncclStream);
+
     C10D_NCCL_CHECK(ncclReduce(
         tensors[i].data_ptr(),
         tensors[i].data_ptr(),
@@ -543,6 +573,16 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allgather(
 
     at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
 
+    // Input `inputTensors` and `flattenOutputTensors` are created on worker
+    // streams and used in different ncclStreams. Hence, `tensors` must record
+    // ncclStreams to prevent beingfreed before ncclReduce finishes.
+    // See [Sync Streams].
+    c10::cuda::CUDACachingAllocator::recordStream(
+      inputTensors[i].storage().data(), ncclStream);
+
+    c10::cuda::CUDACachingAllocator::recordStream(
+      flattenOutputTensors[i].storage().data(), ncclStream);
+
     C10D_NCCL_CHECK(ncclAllGather(
         inputTensors[i].data_ptr(),
         flattenOutputTensors[i].data_ptr(),
@@ -559,6 +599,10 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::allgather(
     at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
     at::cuda::CUDAStreamGuard guard(ncclStream);
     for (size_t j = 0; j < outputTensors[0].size(); ++j) {
+      // See [Sync Streams].
+      c10::cuda::CUDACachingAllocator::recordStream(
+        outputTensors[i][i].storage().data(), ncclStream);
+
       outputTensors[i][j].copy_(flattenOutputTensors[i][j], true);
     }
   }