Record Caffe2's current stream ID in c10_cuda. (#15174)
authorEdward Yang <ezyang@fb.com>
Fri, 21 Dec 2018 05:51:25 +0000 (21:51 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 21 Dec 2018 05:54:05 +0000 (21:54 -0800)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/15174

Previously, Caffe2 maintained a separate per-thread per-device
current logical CUDA stream ID.  In this PR, we switch Caffe2 over
to using c10::Stream to manage the current stream, and also
manage the allocation of cudaStream_t objects.

This results in a slight behavior change: previously, Caffe2
would have been willing to allocate an arbitrary number of
CUDA streams, depending on how high the logical stream IDs
went.  The c10::Stream pool has a fixed number of streams, once
you exceed it, it wraps around.

Reviewed By: dzhulgakov

Differential Revision: D13451550

fbshipit-source-id: da6cf33ee026932a2d873835f6e090f7b8a7d8dc

caffe2/core/context_gpu.h
caffe2/core/context_gpu_test.cc
caffe2/python/operator_test/recurrent_net_executor_test.py

index d50c745..1ab3d4b 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <c10/Device.h>
 #include <c10/Stream.h>
+#include <c10/cuda/CUDAStream.h>
 
 namespace caffe2 {
 
@@ -46,6 +47,10 @@ CAFFE2_CUDA_API CudaMemoryPoolType GetCudaMemoryPoolType();
  * having the ThreadLocalCUDAObjects wrapper that takes care of allocating
  * and deallocating these objects at the thread scope. This class is solely
  * used inside CUDAContext and should not be used externally.
+ *
+ * This class manages the mapping from logical stream ID (int stream_id
+ * passed around in Caffe2) and CUDAStream objects.  We intend to eventually
+ * deprecate the logical stream ID interface, but not for now.
  */
 class CAFFE2_CUDA_API ThreadLocalCUDAObjects {
   friend class CUDAContext;
@@ -53,12 +58,7 @@ class CAFFE2_CUDA_API ThreadLocalCUDAObjects {
  private:
   ThreadLocalCUDAObjects() {
     for (DeviceIndex i = 0; i < CAFFE2_COMPILE_TIME_MAX_GPUS; ++i) {
-      cuda_streams_[i] = vector<cudaStream_t>();
-      cublas_handles_[i] = vector<cublasHandle_t>();
-#ifdef CAFFE2_USE_CUDNN
-      cudnn_handles_[i] = vector<cudnnHandle_t>();
-#endif // CAFFE2_USE_CUDNN
-      current_stream_id_[i] = 0;
+      cuda_streams_[i] = vector<c10::cuda::CUDAStream>();
     }
   }
 
@@ -68,53 +68,53 @@ class CAFFE2_CUDA_API ThreadLocalCUDAObjects {
   // CUDAContext::SwitchToDevice
   void SetCurrentStreamId(DeviceIndex gpu, StreamId stream_id) {
     // TODO: use current device id from thread local instead of passing gpu in
-    current_stream_id_[gpu] = stream_id;
+    c10::cuda::setCurrentCUDAStream(GetCUDAStream(gpu, stream_id));
+  }
+
+  // Retrieves the CUDAStream corresponding to a logical stream ID, ensuring
+  // that it exists in cuda_streams_ if it has not been allocated yet.
+  c10::cuda::CUDAStream GetCUDAStream(DeviceIndex gpu, StreamId stream_id) {
+    vector<c10::cuda::CUDAStream>& gpu_streams = cuda_streams_[gpu];
+    while (gpu_streams.size() <= static_cast<size_t>(stream_id)) {
+      // NB: This streams are not guaranteed to be unique; we'll
+      // wrap around once we run out of streams in the pool.
+      gpu_streams.emplace_back(c10::cuda::getStreamFromPool(/* high priority */ false, gpu));
+    }
+    return gpu_streams[stream_id];
   }
 
   // Uses the logical stream id from the thread local to pick the stream
   // We're going to migrate all usages to this case API instead of passing the
   // stream id directly
   cudaStream_t GetStream(DeviceIndex gpu) {
-    return GetStream(gpu, current_stream_id_[gpu]);
+    return c10::cuda::getCurrentCUDAStream(gpu).stream();
   }
 
   cudaStream_t GetStream(DeviceIndex gpu, StreamId stream_id) {
-    vector<cudaStream_t>& gpu_streams = cuda_streams_[gpu];
-    if (gpu_streams.size() <= static_cast<size_t>(stream_id)) {
-      gpu_streams.resize(stream_id + 1, nullptr);
-    }
-    if (!gpu_streams[stream_id]) {
-      DeviceGuard guard(gpu);
-      CUDA_ENFORCE(cudaStreamCreateWithFlags(
-          &gpu_streams[stream_id], cudaStreamNonBlocking));
-    }
-    return gpu_streams[stream_id];
+    return GetCUDAStream(gpu, stream_id).stream();
   }
 
   // Uses the logical stream id from the thread local to pick the stream
   // We're going to migrate all usages to this case API instead of passing the
   // stream id directly
   cublasHandle_t GetHandle(DeviceIndex gpu) {
-    return GetHandle(gpu, current_stream_id_[gpu]);
+    return GetHandle(c10::cuda::getCurrentCUDAStream(gpu));
   }
 
-  cublasHandle_t GetHandle(DeviceIndex gpu, StreamId stream_id) {
-    DeviceGuard guard(gpu);
-    vector<cublasHandle_t>& gpu_handles = cublas_handles_[gpu];
-    if (gpu_handles.size() <= (unsigned)stream_id) {
-      gpu_handles.resize(stream_id + 1, nullptr);
-    }
-    if (!gpu_handles[stream_id]) {
-      CUBLAS_ENFORCE(cublasCreate(&gpu_handles[stream_id]));
+  cublasHandle_t GetHandle(c10::cuda::CUDAStream cuda_stream) {
+    DeviceGuard guard(cuda_stream.device_index());
+    // Default construct in the map if it doesn't exist, and return a mutable
+    // refernce to it.
+    auto& r = cublas_handles_[cuda_stream];
+    if (r == nullptr) {
+      CUBLAS_ENFORCE(cublasCreate(&r));
       // The default is CUBLAS_POINTER_MODE_HOST. You can override
       // it after obtaining the cublas handle, but do that with
       // caution.
-      CUBLAS_ENFORCE(cublasSetPointerMode(
-          gpu_handles[stream_id], CUBLAS_POINTER_MODE_HOST));
-      CUBLAS_ENFORCE(
-          cublasSetStream(gpu_handles[stream_id], GetStream(gpu, stream_id)));
+      CUBLAS_ENFORCE(cublasSetPointerMode(r, CUBLAS_POINTER_MODE_HOST));
+      CUBLAS_ENFORCE(cublasSetStream(r, cuda_stream));
     }
-    return gpu_handles[stream_id];
+    return r;
   }
 
 #ifdef CAFFE2_USE_CUDNN
@@ -122,52 +122,42 @@ class CAFFE2_CUDA_API ThreadLocalCUDAObjects {
   // We're going to migrate all usages to this case API instead of passing the
   // stream id directly
   cudnnHandle_t GetCudnnHandle(DeviceIndex gpu) {
-    return GetCudnnHandle(gpu, current_stream_id_[gpu]);
+    return GetCudnnHandle(c10::cuda::getCurrentCUDAStream(gpu));
   }
 
-  cudnnHandle_t GetCudnnHandle(DeviceIndex gpu, StreamId stream_id) {
-    DeviceGuard guard(gpu);
-    vector<cudnnHandle_t>& gpu_handles = cudnn_handles_[gpu];
-    if (gpu_handles.size() <= (unsigned)stream_id) {
-      gpu_handles.resize(stream_id + 1, nullptr);
+  cudnnHandle_t GetCudnnHandle(c10::cuda::CUDAStream cuda_stream) {
+    DeviceGuard guard(cuda_stream.device_index());
+    auto& r = cudnn_handles_[cuda_stream];
+    if (r == nullptr) {
+      CUDNN_ENFORCE(cudnnCreate(&r));
+      CUDNN_ENFORCE(cudnnSetStream(r, cuda_stream));
     }
-    if (!gpu_handles[stream_id]) {
-      CUDNN_ENFORCE(cudnnCreate(&gpu_handles[stream_id]));
-      CUDNN_ENFORCE(
-          cudnnSetStream(gpu_handles[stream_id], GetStream(gpu, stream_id)));
-    }
-    return gpu_handles[stream_id];
+    return r;
   }
 #endif // CAFFE2_USE_CUDNN
 
   ~ThreadLocalCUDAObjects() noexcept {
-    for (int i = 0; i < CAFFE2_COMPILE_TIME_MAX_GPUS; ++i) {
-      for (auto& handle : cublas_handles_[i]) {
-        if (handle) {
-          CUBLAS_CHECK(cublasDestroy(handle));
-        }
-      }
-      for (auto& stream : cuda_streams_[i]) {
-        if (stream) {
-          CUDA_CHECK(cudaStreamDestroy(stream));
-        }
+    for (auto element : cublas_handles_) {
+      if (element.second) {
+        CUBLAS_CHECK(cublasDestroy(element.second));
       }
-
+    }
 #ifdef CAFFE2_USE_CUDNN
-      for (auto& handle : cudnn_handles_[i]) {
-        if (handle) {
-          CUDNN_CHECK(cudnnDestroy(handle));
-        }
+    for (auto element : cudnn_handles_) {
+      if (element.second) {
+        CUDNN_CHECK(cudnnDestroy(element.second));
       }
-#endif // CAFFE2_USE_CUDNN
     }
+#endif // CAFFE2_USE_CUDNN
   }
-  vector<cudaStream_t> cuda_streams_[CAFFE2_COMPILE_TIME_MAX_GPUS];
-  vector<cublasHandle_t> cublas_handles_[CAFFE2_COMPILE_TIME_MAX_GPUS];
+  // WARNING: mapping from logical stream ID to c10::cuda::CUDAStream
+  // is NOT bijective; multiple logical stream IDs may map to the
+  // same underlying stream ID.
+  vector<c10::cuda::CUDAStream> cuda_streams_[CAFFE2_COMPILE_TIME_MAX_GPUS];
+  std::unordered_map<c10::cuda::CUDAStream, cublasHandle_t> cublas_handles_;
 #ifdef CAFFE2_USE_CUDNN
-  vector<cudnnHandle_t> cudnn_handles_[CAFFE2_COMPILE_TIME_MAX_GPUS];
+  std::unordered_map<c10::cuda::CUDAStream, cudnnHandle_t> cudnn_handles_;
 #endif // CAFFE2_USE_CUDNN
-  int current_stream_id_[CAFFE2_COMPILE_TIME_MAX_GPUS];
 };
 
 class CAFFE2_CUDA_API CUDAContext final : public BaseContext {
index b59dcdb..e354f28 100644 (file)
@@ -132,6 +132,7 @@ namespace {
 // after thread exit.
 void TEST_GetStreamAddress(cudaStream_t* ptr) {
   CUDAContext context(0);
+  context.SwitchToDevice();
   *ptr = context.cuda_stream();
   // Sleep for a while so we have concurrent thread executions
   std::this_thread::sleep_for(std::chrono::seconds(1));
index c4d613b..f36c22a 100644 (file)
@@ -294,12 +294,12 @@ class TestRNNExecutor(unittest.TestCase):
         # start failing as this function will become defective.
         self.assertEqual(1 if forward_only else 2, num_found)
 
-    if __name__ == "__main__":
-        import unittest
-        import random
-        random.seed(2603)
-        workspace.GlobalInit([
-            'caffe2',
-            '--caffe2_log_level=0',
-            '--caffe2_rnn_executor=1'])
-        unittest.main()
+if __name__ == "__main__":
+    import unittest
+    import random
+    random.seed(2603)
+    workspace.GlobalInit([
+        'caffe2',
+        '--caffe2_log_level=0',
+        '--caffe2_rnn_executor=1'])
+    unittest.main()