Switched multi-GPU to NCCL
authorCyprien Noel <cyprien.noel@gmail.com>
Fri, 6 Jan 2017 22:55:12 +0000 (14:55 -0800)
committerCyprien Noel <cyprien.noel@gmail.com>
Fri, 6 Jan 2017 22:56:39 +0000 (14:56 -0800)
48 files changed:
CMakeLists.txt
Makefile
Makefile.config.example
cmake/Dependencies.cmake
cmake/Modules/FindNCCL.cmake [new file with mode: 0644]
cmake/Summary.cmake
include/caffe/blob.hpp
include/caffe/common.hpp
include/caffe/data_reader.hpp [deleted file]
include/caffe/internal_thread.hpp
include/caffe/layer.hpp
include/caffe/layers/base_data_layer.hpp
include/caffe/layers/data_layer.hpp
include/caffe/layers/hdf5_data_layer.hpp
include/caffe/layers/python_layer.hpp
include/caffe/net.hpp
include/caffe/parallel.hpp
include/caffe/solver.hpp
include/caffe/syncedmem.hpp
include/caffe/util/math_functions.hpp
include/caffe/util/nccl.hpp [new file with mode: 0644]
src/caffe/blob.cpp
src/caffe/common.cpp
src/caffe/data_reader.cpp [deleted file]
src/caffe/internal_thread.cpp
src/caffe/layer.cpp
src/caffe/layers/base_data_layer.cpp
src/caffe/layers/base_data_layer.cu
src/caffe/layers/data_layer.cpp
src/caffe/layers/hdf5_data_layer.cpp
src/caffe/layers/hdf5_data_layer.cu
src/caffe/layers/image_data_layer.cpp
src/caffe/layers/window_data_layer.cpp
src/caffe/net.cpp
src/caffe/parallel.cpp
src/caffe/proto/caffe.proto
src/caffe/solver.cpp
src/caffe/solvers/adagrad_solver.cpp
src/caffe/solvers/nesterov_solver.cpp
src/caffe/solvers/sgd_solver.cpp
src/caffe/syncedmem.cpp
src/caffe/test/test_data_layer.cpp
src/caffe/test/test_gradient_based_solver.cpp
src/caffe/test/test_hdf5data_layer.cpp
src/caffe/util/blocking_queue.cpp
src/caffe/util/db_lmdb.cpp
src/caffe/util/math_functions.cu
tools/caffe.cpp

index da7142c..3af394f 100644 (file)
@@ -28,6 +28,7 @@ include(cmake/ConfigGen.cmake)
 # ---[ Options
 caffe_option(CPU_ONLY  "Build Caffe without CUDA support" OFF) # TODO: rename to USE_CUDA
 caffe_option(USE_CUDNN "Build Caffe with cuDNN library support" ON IF NOT CPU_ONLY)
+caffe_option(USE_NCCL "Build Caffe with NCCL library support" OFF)
 caffe_option(BUILD_SHARED_LIBS "Build shared libraries" ON)
 caffe_option(BUILD_python "Build Python wrapper" ON)
 set(python_version "2" CACHE STRING "Specify which Python version to use")
index ccc4d8b..65d08f7 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -328,6 +328,12 @@ ifeq ($(USE_CUDNN), 1)
        COMMON_FLAGS += -DUSE_CUDNN
 endif
 
+# NCCL acceleration configuration
+ifeq ($(USE_NCCL), 1)
+       LIBRARIES += nccl
+       COMMON_FLAGS += -DUSE_NCCL
+endif
+
 # configure IO libraries
 ifeq ($(USE_OPENCV), 1)
        COMMON_FLAGS += -DUSE_OPENCV
index 07bed63..541cf80 100644 (file)
@@ -94,6 +94,10 @@ LIBRARY_DIRS := $(PYTHON_LIB) /usr/local/lib /usr/lib
 # INCLUDE_DIRS += $(shell brew --prefix)/include
 # LIBRARY_DIRS += $(shell brew --prefix)/lib
 
+# NCCL acceleration switch (uncomment to build with NCCL)
+# https://github.com/NVIDIA/nccl (last tested version: v1.2.3-1+cuda8.0)
+# USE_NCCL := 1
+
 # Uncomment to use `pkg-config` to specify OpenCV library paths.
 # (Usually not necessary -- OpenCV libraries are normally installed in one of the above $LIBRARY_DIRS.)
 # USE_PKG_CONFIG := 1
index ae9ce8e..ba28a12 100644 (file)
@@ -67,6 +67,13 @@ if(NOT HAVE_CUDA)
   add_definitions(-DCPU_ONLY)
 endif()
 
+if(USE_NCCL)
+  find_package(NCCL REQUIRED)
+  include_directories(SYSTEM ${NCCL_INCLUDE_DIR})
+  list(APPEND Caffe_LINKER_LIBS ${NCCL_LIBRARIES})
+  add_definitions(-DUSE_NCCL)
+endif()
+
 # ---[ OpenCV
 if(USE_OPENCV)
   find_package(OpenCV QUIET COMPONENTS core highgui imgproc imgcodecs)
@@ -119,18 +126,18 @@ if(BUILD_python)
     find_package(NumPy 1.7.1)
     # Find the matching boost python implementation
     set(version ${PYTHONLIBS_VERSION_STRING})
-    
+
     STRING( REGEX REPLACE "[^0-9]" "" boost_py_version ${version} )
     find_package(Boost 1.46 COMPONENTS "python-py${boost_py_version}")
     set(Boost_PYTHON_FOUND ${Boost_PYTHON-PY${boost_py_version}_FOUND})
-    
+
     while(NOT "${version}" STREQUAL "" AND NOT Boost_PYTHON_FOUND)
       STRING( REGEX REPLACE "([0-9.]+).[0-9]+" "\\1" version ${version} )
-      
+
       STRING( REGEX REPLACE "[^0-9]" "" boost_py_version ${version} )
       find_package(Boost 1.46 COMPONENTS "python-py${boost_py_version}")
       set(Boost_PYTHON_FOUND ${Boost_PYTHON-PY${boost_py_version}_FOUND})
-      
+
       STRING( REGEX MATCHALL "([0-9.]+).[0-9]+" has_more_version ${version} )
       if("${has_more_version}" STREQUAL "")
         break()
diff --git a/cmake/Modules/FindNCCL.cmake b/cmake/Modules/FindNCCL.cmake
new file mode 100644 (file)
index 0000000..c884593
--- /dev/null
@@ -0,0 +1,26 @@
+set(NCCL_INC_PATHS
+    /usr/include
+    /usr/local/include
+    $ENV{NCCL_DIR}/include
+    )
+
+set(NCCL_LIB_PATHS
+    /lib
+    /lib64
+    /usr/lib
+    /usr/lib64
+    /usr/local/lib
+    /usr/local/lib64
+    $ENV{NCCL_DIR}/lib
+    )
+
+find_path(NCCL_INCLUDE_DIR NAMES nccl.h PATHS ${NCCL_INC_PATHS})
+find_library(NCCL_LIBRARIES NAMES nccl PATHS ${NCCL_LIB_PATHS})
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(NCCL DEFAULT_MSG NCCL_INCLUDE_DIR NCCL_LIBRARIES)
+
+if (NCCL_FOUND)
+  message(STATUS "Found NCCL    (include: ${NCCL_INCLUDE_DIR}, library: ${NCCL_LIBRARIES})")
+  mark_as_advanced(NCCL_INCLUDE_DIR NCCL_LIBRARIES)
+endif ()
index ba025cf..ed8c252 100644 (file)
@@ -117,6 +117,7 @@ function(caffe_print_configuration_summary)
   caffe_status("  USE_OPENCV        :   ${USE_OPENCV}")
   caffe_status("  USE_LEVELDB       :   ${USE_LEVELDB}")
   caffe_status("  USE_LMDB          :   ${USE_LMDB}")
+  caffe_status("  USE_NCCL          :   ${USE_NCCL}")
   caffe_status("  ALLOW_LMDB_NOLOCK :   ${ALLOW_LMDB_NOLOCK}")
   caffe_status("")
   caffe_status("Dependencies:")
index af360ac..2f59471 100644 (file)
@@ -220,6 +220,7 @@ class Blob {
   void set_cpu_data(Dtype* data);
   const int* gpu_shape() const;
   const Dtype* gpu_data() const;
+  void set_gpu_data(Dtype* data);
   const Dtype* cpu_diff() const;
   const Dtype* gpu_diff() const;
   Dtype* mutable_cpu_data();
index 3c6a076..4904d1d 100644 (file)
@@ -158,11 +158,14 @@ class Caffe {
   // Search from start_id to the highest possible device ordinal,
   // return the ordinal of the first available device.
   static int FindDevice(const int start_id = 0);
-  // Parallel training info
+  // Parallel training
   inline static int solver_count() { return Get().solver_count_; }
   inline static void set_solver_count(int val) { Get().solver_count_ = val; }
-  inline static bool root_solver() { return Get().root_solver_; }
-  inline static void set_root_solver(bool val) { Get().root_solver_ = val; }
+  inline static int solver_rank() { return Get().solver_rank_; }
+  inline static void set_solver_rank(int val) { Get().solver_rank_ = val; }
+  inline static bool multiprocess() { return Get().multiprocess_; }
+  inline static void set_multiprocess(bool val) { Get().multiprocess_ = val; }
+  inline static bool root_solver() { return Get().solver_rank_ == 0; }
 
  protected:
 #ifndef CPU_ONLY
@@ -172,8 +175,11 @@ class Caffe {
   shared_ptr<RNG> random_generator_;
 
   Brew mode_;
+
+  // Parallel training
   int solver_count_;
-  bool root_solver_;
+  int solver_rank_;
+  bool multiprocess_;
 
  private:
   // The private constructor to avoid duplicate instantiation.
diff --git a/include/caffe/data_reader.hpp b/include/caffe/data_reader.hpp
deleted file mode 100644 (file)
index 8ed5542..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-#ifndef CAFFE_DATA_READER_HPP_
-#define CAFFE_DATA_READER_HPP_
-
-#include <map>
-#include <string>
-#include <vector>
-
-#include "caffe/common.hpp"
-#include "caffe/internal_thread.hpp"
-#include "caffe/util/blocking_queue.hpp"
-#include "caffe/util/db.hpp"
-
-namespace caffe {
-
-/**
- * @brief Reads data from a source to queues available to data layers.
- * A single reading thread is created per source, even if multiple solvers
- * are running in parallel, e.g. for multi-GPU training. This makes sure
- * databases are read sequentially, and that each solver accesses a different
- * subset of the database. Data is distributed to solvers in a round-robin
- * way to keep parallel training deterministic.
- */
-class DataReader {
- public:
-  explicit DataReader(const LayerParameter& param);
-  ~DataReader();
-
-  inline BlockingQueue<Datum*>& free() const {
-    return queue_pair_->free_;
-  }
-  inline BlockingQueue<Datum*>& full() const {
-    return queue_pair_->full_;
-  }
-
- protected:
-  // Queue pairs are shared between a body and its readers
-  class QueuePair {
-   public:
-    explicit QueuePair(int size);
-    ~QueuePair();
-
-    BlockingQueue<Datum*> free_;
-    BlockingQueue<Datum*> full_;
-
-  DISABLE_COPY_AND_ASSIGN(QueuePair);
-  };
-
-  // A single body is created per source
-  class Body : public InternalThread {
-   public:
-    explicit Body(const LayerParameter& param);
-    virtual ~Body();
-
-   protected:
-    void InternalThreadEntry();
-    void read_one(db::Cursor* cursor, QueuePair* qp);
-
-    const LayerParameter param_;
-    BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;
-
-    friend class DataReader;
-
-  DISABLE_COPY_AND_ASSIGN(Body);
-  };
-
-  // A source is uniquely identified by its layer name + path, in case
-  // the same database is read from two different locations in the net.
-  static inline string source_key(const LayerParameter& param) {
-    return param.name() + ":" + param.data_param().source();
-  }
-
-  const shared_ptr<QueuePair> queue_pair_;
-  shared_ptr<Body> body_;
-
-  static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;
-
-DISABLE_COPY_AND_ASSIGN(DataReader);
-};
-
-}  // namespace caffe
-
-#endif  // CAFFE_DATA_READER_HPP_
index 6a8c5a0..0ba6766 100644 (file)
@@ -42,8 +42,8 @@ class InternalThread {
   bool must_stop();
 
  private:
-  void entry(int device, Caffe::Brew mode, int rand_seed, int solver_count,
-      bool root_solver);
+  void entry(int device, Caffe::Brew mode, int rand_seed,
+      int solver_count, int solver_rank, bool multiprocess);
 
   shared_ptr<boost::thread> thread_;
 };
index 10f353f..30dbfd5 100644 (file)
@@ -38,7 +38,7 @@ class Layer {
    * layer.
    */
   explicit Layer(const LayerParameter& param)
-    : layer_param_(param), is_shared_(false) {
+    : layer_param_(param) {
       // Set phase and copy blobs (if there are any).
       phase_ = param.phase();
       if (layer_param_.blobs_size() > 0) {
@@ -66,7 +66,6 @@ class Layer {
    */
   void SetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {
-    InitMutex();
     CheckBlobCounts(bottom, top);
     LayerSetUp(bottom, top);
     Reshape(bottom, top);
@@ -93,30 +92,6 @@ class Layer {
       const vector<Blob<Dtype>*>& top) {}
 
   /**
-   * @brief Whether a layer should be shared by multiple nets during data
-   *        parallelism. By default, all layers except for data layers should
-   *        not be shared. data layers should be shared to ensure each worker
-   *        solver access data sequentially during data parallelism.
-   */
-  virtual inline bool ShareInParallel() const { return false; }
-
-  /** @brief Return whether this layer is actually shared by other nets.
-   *         If ShareInParallel() is true and using more than one GPU and the
-   *         net has TRAIN phase, then this function is expected return true.
-   */
-  inline bool IsShared() const { return is_shared_; }
-
-  /** @brief Set whether this layer is actually shared by other nets
-   *         If ShareInParallel() is true and using more than one GPU and the
-   *         net has TRAIN phase, then is_shared should be set true.
-   */
-  inline void SetShared(bool is_shared) {
-    CHECK(ShareInParallel() || !is_shared)
-        << type() << "Layer does not support sharing.";
-    is_shared_ = is_shared;
-  }
-
-  /**
    * @brief Adjust the shapes of top blobs and internal buffers to accommodate
    *        the shapes of the bottom blobs.
    *
@@ -428,19 +403,6 @@ class Layer {
   }
 
  private:
-  /** Whether this layer is actually shared by other nets*/
-  bool is_shared_;
-
-  /** The mutex for sequential forward if this layer is shared */
-  shared_ptr<boost::mutex> forward_mutex_;
-
-  /** Initialize forward_mutex_ */
-  void InitMutex();
-  /** Lock forward_mutex_ if this layer is shared */
-  void Lock();
-  /** Unlock forward_mutex_ if this layer is shared */
-  void Unlock();
-
   DISABLE_COPY_AND_ASSIGN(Layer);
 };  // class Layer
 
@@ -450,8 +412,6 @@ class Layer {
 template <typename Dtype>
 inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom,
     const vector<Blob<Dtype>*>& top) {
-  // Lock during forward to ensure sequential forward
-  Lock();
   Dtype loss = 0;
   Reshape(bottom, top);
   switch (Caffe::mode()) {
@@ -482,7 +442,6 @@ inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom,
   default:
     LOG(FATAL) << "Unknown caffe mode.";
   }
-  Unlock();
   return loss;
 }
 
index 2c49b73..925b019 100644 (file)
@@ -68,15 +68,16 @@ class BasePrefetchingDataLayer :
       const vector<Blob<Dtype>*>& top);
 
   // Prefetches batches (asynchronously if to GPU memory)
-  static const int PREFETCH_COUNT = 3;
+  static const int PREFETCH_COUNT = 4;  // same as proto
 
  protected:
   virtual void InternalThreadEntry();
   virtual void load_batch(Batch<Dtype>* batch) = 0;
 
-  Batch<Dtype> prefetch_[PREFETCH_COUNT];
+  vector<shared_ptr<Batch<Dtype> > > prefetch_;
   BlockingQueue<Batch<Dtype>*> prefetch_free_;
   BlockingQueue<Batch<Dtype>*> prefetch_full_;
+  Batch<Dtype>* prefetch_current_;
 
   Blob<Dtype> transformed_data_;
 };
index 6c36179..dec5818 100644 (file)
@@ -4,7 +4,6 @@
 #include <vector>
 
 #include "caffe/blob.hpp"
-#include "caffe/data_reader.hpp"
 #include "caffe/data_transformer.hpp"
 #include "caffe/internal_thread.hpp"
 #include "caffe/layer.hpp"
@@ -29,9 +28,13 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
   virtual inline int MaxTopBlobs() const { return 2; }
 
  protected:
+  void Next();
+  bool Skip();
   virtual void load_batch(Batch<Dtype>* batch);
 
-  DataReader reader_;
+  shared_ptr<db::DB> db_;
+  shared_ptr<db::Cursor> cursor_;
+  uint64_t offset_;
 };
 
 }  // namespace caffe
index b04cf8e..650a3fb 100644 (file)
@@ -23,7 +23,7 @@ template <typename Dtype>
 class HDF5DataLayer : public Layer<Dtype> {
  public:
   explicit HDF5DataLayer(const LayerParameter& param)
-      : Layer<Dtype>(param) {}
+      : Layer<Dtype>(param), offset_() {}
   virtual ~HDF5DataLayer();
   virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top);
@@ -38,6 +38,9 @@ class HDF5DataLayer : public Layer<Dtype> {
   virtual inline int MinTopBlobs() const { return 1; }
 
  protected:
+  void Next();
+  bool Skip();
+
   virtual void Forward_cpu(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top);
   virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
@@ -55,6 +58,7 @@ class HDF5DataLayer : public Layer<Dtype> {
   std::vector<shared_ptr<Blob<Dtype> > > hdf_blobs_;
   std::vector<unsigned int> data_permutation_;
   std::vector<unsigned int> file_permutation_;
+  uint64_t offset_;
 };
 
 }  // namespace caffe
index 66dbbdf..529b09c 100644 (file)
@@ -21,8 +21,8 @@ class PythonLayer : public Layer<Dtype> {
     // Disallow PythonLayer in MultiGPU training stage, due to GIL issues
     // Details: https://github.com/BVLC/caffe/issues/2936
     if (this->phase_ == TRAIN && Caffe::solver_count() > 1
-        && !ShareInParallel()) {
-      LOG(FATAL) << "PythonLayer is not implemented in Multi-GPU training";
+        && !Caffe::root_solver() && !Caffe::multiprocess()) {
+      LOG(FATAL) << "PythonLayer does not support CLI Multi-GPU, use train.py";
     }
     self_.attr("param_str") = bp::str(
         this->layer_param_.python_param().param_str());
index 493bdf2..d3c9306 100644 (file)
@@ -23,10 +23,9 @@ namespace caffe {
 template <typename Dtype>
 class Net {
  public:
-  explicit Net(const NetParameter& param, const Net* root_net = NULL);
+  explicit Net(const NetParameter& param);
   explicit Net(const string& param_file, Phase phase,
-      const int level = 0, const vector<string>* stages = NULL,
-      const Net* root_net = NULL);
+      const int level = 0, const vector<string>* stages = NULL);
   virtual ~Net() {}
 
   /// @brief Initialize a network with a NetParameter.
@@ -228,6 +227,31 @@ class Net {
   static bool StateMeetsRule(const NetState& state, const NetStateRule& rule,
       const string& layer_name);
 
+  // Invoked at specific points during an iteration
+  class Callback {
+   protected:
+    virtual void run(int layer) = 0;
+
+    template <typename T>
+    friend class Net;
+  };
+  const vector<Callback*>& before_forward() const { return before_forward_; }
+  void add_before_forward(Callback* value) {
+    before_forward_.push_back(value);
+  }
+  const vector<Callback*>& after_forward() const { return after_forward_; }
+  void add_after_forward(Callback* value) {
+    after_forward_.push_back(value);
+  }
+  const vector<Callback*>& before_backward() const { return before_backward_; }
+  void add_before_backward(Callback* value) {
+    before_backward_.push_back(value);
+  }
+  const vector<Callback*>& after_backward() const { return after_backward_; }
+  void add_after_backward(Callback* value) {
+    after_backward_.push_back(value);
+  }
+
  protected:
   // Helpers for Init.
   /// @brief Append a new top blob to the net.
@@ -306,9 +330,13 @@ class Net {
   size_t memory_used_;
   /// Whether to compute and display debug info for the net.
   bool debug_info_;
-  /// The root net that actually holds the shared layers in data parallelism
-  const Net* const root_net_;
-  DISABLE_COPY_AND_ASSIGN(Net);
+  // Callbacks
+  vector<Callback*> before_forward_;
+  vector<Callback*> after_forward_;
+  vector<Callback*> before_backward_;
+  vector<Callback*> after_backward_;
+
+DISABLE_COPY_AND_ASSIGN(Net);
 };
 
 
index 6c496c8..64bb48e 100644 (file)
@@ -1,8 +1,11 @@
 #ifndef CAFFE_PARALLEL_HPP_
 #define CAFFE_PARALLEL_HPP_
 
-#include <boost/date_time/posix_time/posix_time.hpp>
+#ifdef USE_NCCL
 
+#include <boost/thread.hpp>
+
+#include <string>
 #include <vector>
 
 #include "caffe/blob.hpp"
@@ -13,6 +16,7 @@
 #include "caffe/solver.hpp"
 #include "caffe/syncedmem.hpp"
 #include "caffe/util/blocking_queue.hpp"
+#include "caffe/util/nccl.hpp"
 
 namespace caffe {
 
@@ -51,7 +55,7 @@ class GPUParams : public Params<Dtype> {
   GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device);
   virtual ~GPUParams();
 
-  void configure(Solver<Dtype>* solver) const;
+  void Configure(Solver<Dtype>* solver) const;
 
  protected:
   using Params<Dtype>::size_;
@@ -59,58 +63,55 @@ class GPUParams : public Params<Dtype> {
   using Params<Dtype>::diff_;
 };
 
-class DevicePair {
- public:
-  DevicePair(int parent, int device)
-      : parent_(parent),
-        device_(device) {
-  }
-  inline int parent() {
-    return parent_;
-  }
-  inline int device() {
-    return device_;
-  }
-
-  // Group GPUs in pairs, by proximity depending on machine's topology
-  static void compute(const vector<int> devices, vector<DevicePair>* pairs);
-
- protected:
-  int parent_;
-  int device_;
-};
-
-// Synchronous data parallelism using map-reduce between local GPUs.
 template<typename Dtype>
-class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
-    public InternalThread {
+class NCCL : public GPUParams<Dtype>,
+             public Solver<Dtype>::Callback,
+             public Net<Dtype>::Callback {
  public:
-  explicit P2PSync(shared_ptr<Solver<Dtype> > root_solver,
-                   P2PSync<Dtype>* parent, const SolverParameter& param);
-  virtual ~P2PSync();
-
-  inline const shared_ptr<Solver<Dtype> >& solver() const {
-    return solver_;
-  }
-
-  void Run(const vector<int>& gpus);
-  void Prepare(const vector<int>& gpus,
-               vector<shared_ptr<P2PSync<Dtype> > >* syncs);
-  inline const int initial_iter() const { return initial_iter_; }
+  /**
+   * Single process version.
+   */
+  explicit NCCL(shared_ptr<Solver<Dtype> > solver);
+  /**
+   * In multi-process settings, first create a NCCL id (new_uid), then
+   * pass it to each process to create connected instances.
+   */
+  NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid);
+  ~NCCL();
+
+  boost::barrier* barrier();
+  void set_barrier(boost::barrier* value);
+
+  /**
+   * In single process settings, create instances without uids and
+   * call this to connect them.
+   */
+  static void InitSingleProcess(vector<NCCL<Dtype>*>* nccls);
+
+  static string new_uid();
+
+  /**
+   * Broadcast weights from rank 0 other solvers.
+   */
+  void Broadcast();
+
+  /**
+   * Single process multi-GPU.
+   */
+  void Run(const vector<int>& gpus, const char* restore);
 
  protected:
-  void on_start();
+  void Init();
+  void on_start() {}
+  void run(int layer);  // Net callback
   void on_gradients_ready();
 
-  void InternalThreadEntry();
+  ncclComm_t comm_;
+  cudaStream_t stream_;
 
-  P2PSync<Dtype>* parent_;
-  vector<P2PSync<Dtype>*> children_;
-  BlockingQueue<P2PSync<Dtype>*> queue_;
-  const int initial_iter_;
-  Dtype* parent_grads_;
   shared_ptr<Solver<Dtype> > solver_;
-
+  // Should not be necessary, https://github.com/NVIDIA/nccl/issues/37
+  boost::barrier* barrier_;
   using Params<Dtype>::size_;
   using Params<Dtype>::data_;
   using Params<Dtype>::diff_;
@@ -118,4 +119,5 @@ class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
 
 }  // namespace caffe
 
-#endif
+#endif  // USE_NCCL
+#endif  // header
index eafcee3..a28d8cb 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "caffe/net.hpp"
 #include "caffe/solver_factory.hpp"
+#include "caffe/util/benchmark.hpp"
 
 namespace caffe {
 
@@ -40,9 +41,8 @@ typedef boost::function<SolverAction::Enum()> ActionCallback;
 template <typename Dtype>
 class Solver {
  public:
-  explicit Solver(const SolverParameter& param,
-      const Solver* root_solver = NULL);
-  explicit Solver(const string& param_file, const Solver* root_solver = NULL);
+  explicit Solver(const SolverParameter& param);
+  explicit Solver(const string& param_file);
   void Init(const SolverParameter& param);
   void InitTrainNet();
   void InitTestNets();
@@ -72,7 +72,7 @@ class Solver {
   inline const vector<shared_ptr<Net<Dtype> > >& test_nets() {
     return test_nets_;
   }
-  int iter() { return iter_; }
+  int iter() const { return iter_; }
 
   // Invoked at specific points during an iteration
   class Callback {
@@ -118,10 +118,6 @@ class Solver {
   vector<Dtype> losses_;
   Dtype smoothed_loss_;
 
-  // The root solver that holds root nets (actually containing shared layers)
-  // in data parallelism
-  const Solver* const root_solver_;
-
   // A function that can be set by a client of the Solver to provide indication
   // that it wants a snapshot saved and/or to exit early.
   ActionCallback action_request_function_;
@@ -129,31 +125,11 @@ class Solver {
   // True iff a request to stop early was received.
   bool requested_early_exit_;
 
-  DISABLE_COPY_AND_ASSIGN(Solver);
-};
+  // Timing information, handy to tune e.g. nbr of GPUs
+  Timer iteration_timer_;
+  float iterations_last_;
 
-/**
- * @brief Solver that only computes gradients, used as worker
- *        for multi-GPU training.
- */
-template <typename Dtype>
-class WorkerSolver : public Solver<Dtype> {
- public:
-  explicit WorkerSolver(const SolverParameter& param,
-      const Solver<Dtype>* root_solver = NULL)
-      : Solver<Dtype>(param, root_solver) {}
-
- protected:
-  void ApplyUpdate() {}
-  void SnapshotSolverState(const string& model_filename) {
-    LOG(FATAL) << "Should not be called on worker solver.";
-  }
-  void RestoreSolverStateFromBinaryProto(const string& state_file) {
-    LOG(FATAL) << "Should not be called on worker solver.";
-  }
-  void RestoreSolverStateFromHDF5(const string& state_file) {
-    LOG(FATAL) << "Should not be called on worker solver.";
-  }
+  DISABLE_COPY_AND_ASSIGN(Solver);
 };
 
 }  // namespace caffe
index 38ee466..a41066b 100644 (file)
@@ -44,14 +44,8 @@ inline void CaffeFreeHost(void* ptr, bool use_cuda) {
  */
 class SyncedMemory {
  public:
-  SyncedMemory()
-      : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED),
-        own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false),
-        gpu_device_(-1) {}
-  explicit SyncedMemory(size_t size)
-      : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED),
-        own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false),
-        gpu_device_(-1) {}
+  SyncedMemory();
+  explicit SyncedMemory(size_t size);
   ~SyncedMemory();
   const void* cpu_data();
   void set_cpu_data(void* data);
@@ -68,6 +62,8 @@ class SyncedMemory {
 #endif
 
  private:
+  void check_device();
+
   void to_cpu();
   void to_gpu();
   void* cpu_ptr_;
@@ -77,7 +73,7 @@ class SyncedMemory {
   bool own_cpu_data_;
   bool cpu_malloc_use_cuda_;
   bool own_gpu_data_;
-  int gpu_device_;
+  int device_;
 
   DISABLE_COPY_AND_ASSIGN(SyncedMemory);
 };  // class SyncedMemory
index 6f6d3fe..51068fe 100644 (file)
@@ -185,6 +185,11 @@ void caffe_gpu_add_scalar(const int N, const Dtype alpha, Dtype *X);
 template <typename Dtype>
 void caffe_gpu_scal(const int N, const Dtype alpha, Dtype *X);
 
+#ifndef CPU_ONLY
+template <typename Dtype>
+void caffe_gpu_scal(const int N, const Dtype alpha, Dtype* X, cudaStream_t str);
+#endif
+
 template <typename Dtype>
 void caffe_gpu_add(const int N, const Dtype* a, const Dtype* b, Dtype* y);
 
diff --git a/include/caffe/util/nccl.hpp b/include/caffe/util/nccl.hpp
new file mode 100644 (file)
index 0000000..e01fb74
--- /dev/null
@@ -0,0 +1,37 @@
+#ifndef CAFFE_UTIL_NCCL_H_
+#define CAFFE_UTIL_NCCL_H_
+#ifdef USE_NCCL
+
+#include <nccl.h>
+
+#include "caffe/common.hpp"
+
+#define NCCL_CHECK(condition) \
+{ \
+  ncclResult_t result = condition; \
+  CHECK_EQ(result, ncclSuccess) << " " \
+    << ncclGetErrorString(result); \
+}
+
+namespace caffe {
+
+namespace nccl {
+
+template <typename Dtype> class dataType;
+
+template<> class dataType<float> {
+ public:
+  static const ncclDataType_t type = ncclFloat;
+};
+template<> class dataType<double> {
+ public:
+  static const ncclDataType_t type = ncclDouble;
+};
+
+}  // namespace nccl
+
+}  // namespace caffe
+
+#endif  // end USE_NCCL
+
+#endif  // CAFFE_UTIL_NCCL_H_
index 4a34e4c..603e52f 100644 (file)
@@ -89,6 +89,12 @@ const Dtype* Blob<Dtype>::cpu_data() const {
 template <typename Dtype>
 void Blob<Dtype>::set_cpu_data(Dtype* data) {
   CHECK(data);
+  // Make sure CPU and GPU sizes remain equal
+  size_t size = count_ * sizeof(Dtype);
+  if (data_->size() != size) {
+    data_.reset(new SyncedMemory(size));
+    diff_.reset(new SyncedMemory(size));
+  }
   data_->set_cpu_data(data);
 }
 
@@ -99,6 +105,18 @@ const Dtype* Blob<Dtype>::gpu_data() const {
 }
 
 template <typename Dtype>
+void Blob<Dtype>::set_gpu_data(Dtype* data) {
+  CHECK(data);
+  // Make sure CPU and GPU sizes remain equal
+  size_t size = count_ * sizeof(Dtype);
+  if (data_->size() != size) {
+    data_.reset(new SyncedMemory(size));
+    diff_.reset(new SyncedMemory(size));
+  }
+  data_->set_gpu_data(data);
+}
+
+template <typename Dtype>
 const Dtype* Blob<Dtype>::cpu_diff() const {
   CHECK(diff_);
   return (const Dtype*)diff_->cpu_data();
index dee6816..4f6f9bc 100644 (file)
@@ -53,7 +53,7 @@ void GlobalInit(int* pargc, char*** pargv) {
 
 Caffe::Caffe()
     : random_generator_(), mode_(Caffe::CPU),
-      solver_count_(1), root_solver_(true) { }
+      solver_count_(1), solver_rank_(0), multiprocess_(false) { }
 
 Caffe::~Caffe() { }
 
@@ -106,7 +106,8 @@ void* Caffe::RNG::generator() {
 
 Caffe::Caffe()
     : cublas_handle_(NULL), curand_generator_(NULL), random_generator_(),
-    mode_(Caffe::CPU), solver_count_(1), root_solver_(true) {
+    mode_(Caffe::CPU),
+    solver_count_(1), solver_rank_(0), multiprocess_(false) {
   // Try to create a cublas handler, and report an error if failed (but we will
   // keep the program running as one might just want to run CPU code).
   if (cublasCreate(&cublas_handle_) != CUBLAS_STATUS_SUCCESS) {
diff --git a/src/caffe/data_reader.cpp b/src/caffe/data_reader.cpp
deleted file mode 100644 (file)
index 9f019bb..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-#include <boost/thread.hpp>
-#include <map>
-#include <string>
-#include <vector>
-
-#include "caffe/common.hpp"
-#include "caffe/data_reader.hpp"
-#include "caffe/layers/data_layer.hpp"
-#include "caffe/proto/caffe.pb.h"
-
-namespace caffe {
-
-using boost::weak_ptr;
-
-map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
-static boost::mutex bodies_mutex_;
-
-DataReader::DataReader(const LayerParameter& param)
-    : queue_pair_(new QueuePair(  //
-        param.data_param().prefetch() * param.data_param().batch_size())) {
-  // Get or create a body
-  boost::mutex::scoped_lock lock(bodies_mutex_);
-  string key = source_key(param);
-  weak_ptr<Body>& weak = bodies_[key];
-  body_ = weak.lock();
-  if (!body_) {
-    body_.reset(new Body(param));
-    bodies_[key] = weak_ptr<Body>(body_);
-  }
-  body_->new_queue_pairs_.push(queue_pair_);
-}
-
-DataReader::~DataReader() {
-  string key = source_key(body_->param_);
-  body_.reset();
-  boost::mutex::scoped_lock lock(bodies_mutex_);
-  if (bodies_[key].expired()) {
-    bodies_.erase(key);
-  }
-}
-
-//
-
-DataReader::QueuePair::QueuePair(int size) {
-  // Initialize the free queue with requested number of datums
-  for (int i = 0; i < size; ++i) {
-    free_.push(new Datum());
-  }
-}
-
-DataReader::QueuePair::~QueuePair() {
-  Datum* datum;
-  while (free_.try_pop(&datum)) {
-    delete datum;
-  }
-  while (full_.try_pop(&datum)) {
-    delete datum;
-  }
-}
-
-//
-
-DataReader::Body::Body(const LayerParameter& param)
-    : param_(param),
-      new_queue_pairs_() {
-  StartInternalThread();
-}
-
-DataReader::Body::~Body() {
-  StopInternalThread();
-}
-
-void DataReader::Body::InternalThreadEntry() {
-  shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
-  db->Open(param_.data_param().source(), db::READ);
-  shared_ptr<db::Cursor> cursor(db->NewCursor());
-  vector<shared_ptr<QueuePair> > qps;
-  try {
-    int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
-
-    // To ensure deterministic runs, only start running once all solvers
-    // are ready. But solvers need to peek on one item during initialization,
-    // so read one item, then wait for the next solver.
-    for (int i = 0; i < solver_count; ++i) {
-      shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
-      read_one(cursor.get(), qp.get());
-      qps.push_back(qp);
-    }
-    // Main loop
-    while (!must_stop()) {
-      for (int i = 0; i < solver_count; ++i) {
-        read_one(cursor.get(), qps[i].get());
-      }
-      // Check no additional readers have been created. This can happen if
-      // more than one net is trained at a time per process, whether single
-      // or multi solver. It might also happen if two data layers have same
-      // name and same source.
-      CHECK_EQ(new_queue_pairs_.size(), 0);
-    }
-  } catch (boost::thread_interrupted&) {
-    // Interrupted exception is expected on shutdown
-  }
-}
-
-void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
-  Datum* datum = qp->free_.pop();
-  // TODO deserialize in-place instead of copy?
-  datum->ParseFromString(cursor->value());
-  qp->full_.push(datum);
-
-  // go to the next iter
-  cursor->Next();
-  if (!cursor->valid()) {
-    DLOG(INFO) << "Restarting data prefetching from start.";
-    cursor->SeekToFirst();
-  }
-}
-
-}  // namespace caffe
index 104884e..11de497 100644 (file)
@@ -28,25 +28,27 @@ void InternalThread::StartInternalThread() {
   Caffe::Brew mode = Caffe::mode();
   int rand_seed = caffe_rng_rand();
   int solver_count = Caffe::solver_count();
-  bool root_solver = Caffe::root_solver();
+  int solver_rank = Caffe::solver_rank();
+  bool multiprocess = Caffe::multiprocess();
 
   try {
     thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
-          rand_seed, solver_count, root_solver));
+          rand_seed, solver_count, solver_rank, multiprocess));
   } catch (std::exception& e) {
     LOG(FATAL) << "Thread exception: " << e.what();
   }
 }
 
 void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed,
-    int solver_count, bool root_solver) {
+    int solver_count, int solver_rank, bool multiprocess) {
 #ifndef CPU_ONLY
   CUDA_CHECK(cudaSetDevice(device));
 #endif
   Caffe::set_mode(mode);
   Caffe::set_random_seed(rand_seed);
   Caffe::set_solver_count(solver_count);
-  Caffe::set_root_solver(root_solver);
+  Caffe::set_solver_rank(solver_rank);
+  Caffe::set_multiprocess(multiprocess);
 
   InternalThreadEntry();
 }
index 3b91289..684ae88 100644 (file)
@@ -1,27 +1,7 @@
-#include <boost/thread.hpp>
 #include "caffe/layer.hpp"
 
 namespace caffe {
 
-template <typename Dtype>
-void Layer<Dtype>::InitMutex() {
-  forward_mutex_.reset(new boost::mutex());
-}
-
-template <typename Dtype>
-void Layer<Dtype>::Lock() {
-  if (IsShared()) {
-    forward_mutex_->lock();
-  }
-}
-
-template <typename Dtype>
-void Layer<Dtype>::Unlock() {
-  if (IsShared()) {
-    forward_mutex_->unlock();
-  }
-}
-
 INSTANTIATE_CLASS(Layer);
 
 }  // namespace caffe
index 989319f..9414f6f 100644 (file)
@@ -36,9 +36,12 @@ template <typename Dtype>
 BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
     const LayerParameter& param)
     : BaseDataLayer<Dtype>(param),
-      prefetch_free_(), prefetch_full_() {
-  for (int i = 0; i < PREFETCH_COUNT; ++i) {
-    prefetch_free_.push(&prefetch_[i]);
+      prefetch_(param.has_data_param() ?
+                param.data_param().prefetch() : PREFETCH_COUNT),
+      prefetch_free_(), prefetch_full_(), prefetch_current_() {
+  for (int i = 0; i < prefetch_.size(); ++i) {
+    prefetch_[i].reset(new Batch<Dtype>());
+    prefetch_free_.push(prefetch_[i].get());
   }
 }
 
@@ -46,22 +49,23 @@ template <typename Dtype>
 void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
     const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
   BaseDataLayer<Dtype>::LayerSetUp(bottom, top);
+
   // Before starting the prefetch thread, we make cpu_data and gpu_data
   // calls so that the prefetch thread does not accidentally make simultaneous
   // cudaMalloc calls when the main thread is running. In some GPUs this
   // seems to cause failures if we do not so.
-  for (int i = 0; i < PREFETCH_COUNT; ++i) {
-    prefetch_[i].data_.mutable_cpu_data();
+  for (int i = 0; i < prefetch_.size(); ++i) {
+    prefetch_[i]->data_.mutable_cpu_data();
     if (this->output_labels_) {
-      prefetch_[i].label_.mutable_cpu_data();
+      prefetch_[i]->label_.mutable_cpu_data();
     }
   }
 #ifndef CPU_ONLY
   if (Caffe::mode() == Caffe::GPU) {
-    for (int i = 0; i < PREFETCH_COUNT; ++i) {
-      prefetch_[i].data_.mutable_gpu_data();
+    for (int i = 0; i < prefetch_.size(); ++i) {
+      prefetch_[i]->data_.mutable_gpu_data();
       if (this->output_labels_) {
-        prefetch_[i].label_.mutable_gpu_data();
+        prefetch_[i]->label_.mutable_gpu_data();
       }
     }
   }
@@ -88,6 +92,9 @@ void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
 #ifndef CPU_ONLY
       if (Caffe::mode() == Caffe::GPU) {
         batch->data_.data().get()->async_gpu_push(stream);
+        if (this->output_labels_) {
+          batch->label_.data().get()->async_gpu_push(stream);
+        }
         CUDA_CHECK(cudaStreamSynchronize(stream));
       }
 #endif
@@ -106,22 +113,18 @@ void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
 template <typename Dtype>
 void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
     const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
-  Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
+  if (prefetch_current_) {
+    prefetch_free_.push(prefetch_current_);
+  }
+  prefetch_current_ = prefetch_full_.pop("Waiting for data");
   // Reshape to loaded data.
-  top[0]->ReshapeLike(batch->data_);
-  // Copy the data
-  caffe_copy(batch->data_.count(), batch->data_.cpu_data(),
-             top[0]->mutable_cpu_data());
-  DLOG(INFO) << "Prefetch copied";
+  top[0]->ReshapeLike(prefetch_current_->data_);
+  top[0]->set_cpu_data(prefetch_current_->data_.mutable_cpu_data());
   if (this->output_labels_) {
     // Reshape to loaded labels.
-    top[1]->ReshapeLike(batch->label_);
-    // Copy the labels.
-    caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
-        top[1]->mutable_cpu_data());
+    top[1]->ReshapeLike(prefetch_current_->label_);
+    top[1]->set_cpu_data(prefetch_current_->label_.mutable_cpu_data());
   }
-
-  prefetch_free_.push(batch);
 }
 
 #ifdef CPU_ONLY
index 4056d36..64c621a 100644 (file)
@@ -7,23 +7,18 @@ namespace caffe {
 template <typename Dtype>
 void BasePrefetchingDataLayer<Dtype>::Forward_gpu(
     const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
-  Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
+  if (prefetch_current_) {
+    prefetch_free_.push(prefetch_current_);
+  }
+  prefetch_current_ = prefetch_full_.pop("Waiting for data");
   // Reshape to loaded data.
-  top[0]->ReshapeLike(batch->data_);
-  // Copy the data
-  caffe_copy(batch->data_.count(), batch->data_.gpu_data(),
-      top[0]->mutable_gpu_data());
+  top[0]->ReshapeLike(prefetch_current_->data_);
+  top[0]->set_gpu_data(prefetch_current_->data_.mutable_gpu_data());
   if (this->output_labels_) {
     // Reshape to loaded labels.
-    top[1]->ReshapeLike(batch->label_);
-    // Copy the labels.
-    caffe_copy(batch->label_.count(), batch->label_.gpu_data(),
-        top[1]->mutable_gpu_data());
+    top[1]->ReshapeLike(prefetch_current_->label_);
+    top[1]->set_gpu_data(prefetch_current_->label_.mutable_gpu_data());
   }
-  // Ensure the copy is synchronous wrt the host, so that the next batch isn't
-  // copied in meanwhile.
-  CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
-  prefetch_free_.push(batch);
 }
 
 INSTANTIATE_LAYER_GPU_FORWARD(BasePrefetchingDataLayer);
index 66e6301..0f1296b 100644 (file)
@@ -14,7 +14,10 @@ namespace caffe {
 template <typename Dtype>
 DataLayer<Dtype>::DataLayer(const LayerParameter& param)
   : BasePrefetchingDataLayer<Dtype>(param),
-    reader_(param) {
+    offset_() {
+  db_.reset(db::GetDB(param.data_param().backend()));
+  db_->Open(param.data_param().source(), db::READ);
+  cursor_.reset(db_->NewCursor());
 }
 
 template <typename Dtype>
@@ -27,7 +30,8 @@ void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {
   const int batch_size = this->layer_param_.data_param().batch_size();
   // Read a data point, and use it to initialize the top blob.
-  Datum& datum = *(reader_.full().peek());
+  Datum datum;
+  datum.ParseFromString(cursor_->value());
 
   // Use data_transformer to infer the expected blob shape from datum.
   vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
@@ -35,22 +39,44 @@ void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
   // Reshape top[0] and prefetch_data according to the batch_size.
   top_shape[0] = batch_size;
   top[0]->Reshape(top_shape);
-  for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
-    this->prefetch_[i].data_.Reshape(top_shape);
+  for (int i = 0; i < this->prefetch_.size(); ++i) {
+    this->prefetch_[i]->data_.Reshape(top_shape);
   }
-  LOG(INFO) << "output data size: " << top[0]->num() << ","
+  LOG_IF(INFO, Caffe::root_solver())
+      << "output data size: " << top[0]->num() << ","
       << top[0]->channels() << "," << top[0]->height() << ","
       << top[0]->width();
   // label
   if (this->output_labels_) {
     vector<int> label_shape(1, batch_size);
     top[1]->Reshape(label_shape);
-    for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
-      this->prefetch_[i].label_.Reshape(label_shape);
+    for (int i = 0; i < this->prefetch_.size(); ++i) {
+      this->prefetch_[i]->label_.Reshape(label_shape);
     }
   }
 }
 
+template <typename Dtype>
+bool DataLayer<Dtype>::Skip() {
+  int size = Caffe::solver_count();
+  int rank = Caffe::solver_rank();
+  bool keep = (offset_ % size) == rank ||
+              // In test mode, only rank 0 runs, so avoid skipping
+              this->layer_param_.phase() == TEST;
+  return !keep;
+}
+
+template<typename Dtype>
+void DataLayer<Dtype>::Next() {
+  cursor_->Next();
+  if (!cursor_->valid()) {
+    LOG_IF(INFO, Caffe::root_solver())
+        << "Restarting data prefetching from start.";
+    cursor_->SeekToFirst();
+  }
+  offset_++;
+}
+
 // This function is called on prefetch thread
 template<typename Dtype>
 void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
@@ -61,41 +87,41 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
   CPUTimer timer;
   CHECK(batch->data_.count());
   CHECK(this->transformed_data_.count());
-
-  // Reshape according to the first datum of each batch
-  // on single input batches allows for inputs of varying dimension.
   const int batch_size = this->layer_param_.data_param().batch_size();
-  Datum& datum = *(reader_.full().peek());
-  // Use data_transformer to infer the expected blob shape from datum.
-  vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
-  this->transformed_data_.Reshape(top_shape);
-  // Reshape batch according to the batch_size.
-  top_shape[0] = batch_size;
-  batch->data_.Reshape(top_shape);
-
-  Dtype* top_data = batch->data_.mutable_cpu_data();
-  Dtype* top_label = NULL;  // suppress warnings about uninitialized variables
 
-  if (this->output_labels_) {
-    top_label = batch->label_.mutable_cpu_data();
-  }
+  Datum datum;
   for (int item_id = 0; item_id < batch_size; ++item_id) {
     timer.Start();
-    // get a datum
-    Datum& datum = *(reader_.full().pop("Waiting for data"));
+    while (Skip()) {
+      Next();
+    }
+    datum.ParseFromString(cursor_->value());
     read_time += timer.MicroSeconds();
-    timer.Start();
+
+    if (item_id == 0) {
+      // Reshape according to the first datum of each batch
+      // on single input batches allows for inputs of varying dimension.
+      // Use data_transformer to infer the expected blob shape from datum.
+      vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
+      this->transformed_data_.Reshape(top_shape);
+      // Reshape batch according to the batch_size.
+      top_shape[0] = batch_size;
+      batch->data_.Reshape(top_shape);
+    }
+
     // Apply data transformations (mirror, scale, crop...)
+    timer.Start();
     int offset = batch->data_.offset(item_id);
+    Dtype* top_data = batch->data_.mutable_cpu_data();
     this->transformed_data_.set_cpu_data(top_data + offset);
     this->data_transformer_->Transform(datum, &(this->transformed_data_));
     // Copy label.
     if (this->output_labels_) {
+      Dtype* top_label = batch->label_.mutable_cpu_data();
       top_label[item_id] = datum.label();
     }
     trans_time += timer.MicroSeconds();
-
-    reader_.free().push(const_cast<Datum*>(&datum));
+    Next();
   }
   timer.Stop();
   batch_timer.Stop();
index c957451..b9a071c 100644 (file)
@@ -125,27 +125,45 @@ void HDF5DataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,
 }
 
 template <typename Dtype>
+bool HDF5DataLayer<Dtype>::Skip() {
+  int size = Caffe::solver_count();
+  int rank = Caffe::solver_rank();
+  bool keep = (offset_ % size) == rank ||
+              // In test mode, only rank 0 runs, so avoid skipping
+              this->layer_param_.phase() == TEST;
+  return !keep;
+}
+
+template<typename Dtype>
+void HDF5DataLayer<Dtype>::Next() {
+  if (++current_row_ == hdf_blobs_[0]->shape(0)) {
+    if (num_files_ > 1) {
+      ++current_file_;
+      if (current_file_ == num_files_) {
+        current_file_ = 0;
+        if (this->layer_param_.hdf5_data_param().shuffle()) {
+          std::random_shuffle(file_permutation_.begin(),
+                              file_permutation_.end());
+        }
+        DLOG(INFO) << "Looping around to first file.";
+      }
+      LoadHDF5FileData(
+        hdf_filenames_[file_permutation_[current_file_]].c_str());
+    }
+    current_row_ = 0;
+    if (this->layer_param_.hdf5_data_param().shuffle())
+      std::random_shuffle(data_permutation_.begin(), data_permutation_.end());
+  }
+  offset_++;
+}
+
+template <typename Dtype>
 void HDF5DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {
   const int batch_size = this->layer_param_.hdf5_data_param().batch_size();
-  for (int i = 0; i < batch_size; ++i, ++current_row_) {
-    if (current_row_ == hdf_blobs_[0]->shape(0)) {
-      if (num_files_ > 1) {
-        ++current_file_;
-        if (current_file_ == num_files_) {
-          current_file_ = 0;
-          if (this->layer_param_.hdf5_data_param().shuffle()) {
-            std::random_shuffle(file_permutation_.begin(),
-                                file_permutation_.end());
-          }
-          DLOG(INFO) << "Looping around to first file.";
-        }
-        LoadHDF5FileData(
-            hdf_filenames_[file_permutation_[current_file_]].c_str());
-      }
-      current_row_ = 0;
-      if (this->layer_param_.hdf5_data_param().shuffle())
-        std::random_shuffle(data_permutation_.begin(), data_permutation_.end());
+  for (int i = 0; i < batch_size; ++i) {
+    while (Skip()) {
+      Next();
     }
     for (int j = 0; j < this->layer_param_.top_size(); ++j) {
       int data_dim = top[j]->count() / top[j]->shape(0);
@@ -153,6 +171,7 @@ void HDF5DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
           &hdf_blobs_[j]->cpu_data()[data_permutation_[current_row_]
             * data_dim], &top[j]->mutable_cpu_data()[i * data_dim]);
     }
+    Next();
   }
 }
 
index 595d223..33eebd4 100644 (file)
@@ -17,24 +17,9 @@ template <typename Dtype>
 void HDF5DataLayer<Dtype>::Forward_gpu(const vector<Blob<Dtype>*>& bottom,
       const vector<Blob<Dtype>*>& top) {
   const int batch_size = this->layer_param_.hdf5_data_param().batch_size();
-  for (int i = 0; i < batch_size; ++i, ++current_row_) {
-    if (current_row_ == hdf_blobs_[0]->shape(0)) {
-      if (num_files_ > 1) {
-        current_file_ += 1;
-        if (current_file_ == num_files_) {
-          current_file_ = 0;
-          if (this->layer_param_.hdf5_data_param().shuffle()) {
-            std::random_shuffle(file_permutation_.begin(),
-                                file_permutation_.end());
-          }
-          DLOG(INFO) << "Looping around to first file.";
-        }
-        LoadHDF5FileData(
-            hdf_filenames_[file_permutation_[current_file_]].c_str());
-      }
-      current_row_ = 0;
-      if (this->layer_param_.hdf5_data_param().shuffle())
-        std::random_shuffle(data_permutation_.begin(), data_permutation_.end());
+  for (int i = 0; i < batch_size; ++i) {
+    while (Skip()) {
+      Next();
     }
     for (int j = 0; j < this->layer_param_.top_size(); ++j) {
       int data_dim = top[j]->count() / top[j]->shape(0);
@@ -42,6 +27,7 @@ void HDF5DataLayer<Dtype>::Forward_gpu(const vector<Blob<Dtype>*>& bottom,
           &hdf_blobs_[j]->cpu_data()[data_permutation_[current_row_]
             * data_dim], &top[j]->mutable_gpu_data()[i * data_dim]);
     }
+    Next();
   }
 }
 
index 7ee7dc4..ec0fc5b 100644 (file)
@@ -54,6 +54,11 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
     const unsigned int prefetch_rng_seed = caffe_rng_rand();
     prefetch_rng_.reset(new Caffe::RNG(prefetch_rng_seed));
     ShuffleImages();
+  } else {
+    if (this->phase_ == TRAIN && Caffe::solver_rank() > 0 &&
+        this->layer_param_.image_data_param().rand_skip() == 0) {
+      LOG(WARNING) << "Shuffling or skipping recommended for multi-GPU";
+    }
   }
   LOG(INFO) << "A total of " << lines_.size() << " images.";
 
@@ -77,8 +82,8 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
   const int batch_size = this->layer_param_.image_data_param().batch_size();
   CHECK_GT(batch_size, 0) << "Positive batch size required";
   top_shape[0] = batch_size;
-  for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
-    this->prefetch_[i].data_.Reshape(top_shape);
+  for (int i = 0; i < this->prefetch_.size(); ++i) {
+    this->prefetch_[i]->data_.Reshape(top_shape);
   }
   top[0]->Reshape(top_shape);
 
@@ -88,8 +93,8 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
   // label
   vector<int> label_shape(1, batch_size);
   top[1]->Reshape(label_shape);
-  for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
-    this->prefetch_[i].label_.Reshape(label_shape);
+  for (int i = 0; i < this->prefetch_.size(); ++i) {
+    this->prefetch_[i]->label_.Reshape(label_shape);
   }
 }
 
index 103dd4b..1bf3760 100644 (file)
@@ -173,8 +173,8 @@ void WindowDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
   CHECK_GT(crop_size, 0);
   const int batch_size = this->layer_param_.window_data_param().batch_size();
   top[0]->Reshape(batch_size, channels, crop_size, crop_size);
-  for (int i = 0; i < this->PREFETCH_COUNT; ++i)
-    this->prefetch_[i].data_.Reshape(
+  for (int i = 0; i < this->prefetch_.size(); ++i)
+    this->prefetch_[i]->data_.Reshape(
         batch_size, channels, crop_size, crop_size);
 
   LOG(INFO) << "output data size: " << top[0]->num() << ","
@@ -183,8 +183,8 @@ void WindowDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
   // label
   vector<int> label_shape(1, batch_size);
   top[1]->Reshape(label_shape);
-  for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
-    this->prefetch_[i].label_.Reshape(label_shape);
+  for (int i = 0; i < this->prefetch_.size(); ++i) {
+    this->prefetch_[i]->label_.Reshape(label_shape);
   }
 
   // data mean
index 644cb7e..aa9e8f2 100644 (file)
 namespace caffe {
 
 template <typename Dtype>
-Net<Dtype>::Net(const NetParameter& param, const Net* root_net)
-    : root_net_(root_net) {
+Net<Dtype>::Net(const NetParameter& param) {
   Init(param);
 }
 
 template <typename Dtype>
 Net<Dtype>::Net(const string& param_file, Phase phase,
-    const int level, const vector<string>* stages,
-    const Net* root_net)
-    : root_net_(root_net) {
+    const int level, const vector<string>* stages) {
   NetParameter param;
   ReadNetParamsFromTextFileOrDie(param_file, &param);
   // Set phase, stages and level
@@ -47,8 +44,6 @@ Net<Dtype>::Net(const string& param_file, Phase phase,
 
 template <typename Dtype>
 void Net<Dtype>::Init(const NetParameter& in_param) {
-  CHECK(Caffe::root_solver() || root_net_)
-      << "root_net_ needs to be set for all non-root solvers";
   // Set phase from the state.
   phase_ = in_param.state().phase();
   // Filter layers based on their include/exclude rules and
@@ -74,9 +69,6 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
   top_id_vecs_.resize(param.layer_size());
   bottom_need_backward_.resize(param.layer_size());
   for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) {
-    // For non-root solvers, whether this layer is shared from root_net_.
-    bool share_from_root = !Caffe::root_solver()
-        && root_net_->layers_[layer_id]->ShareInParallel();
     // Inherit phase from net if unset.
     if (!param.layer(layer_id).has_phase()) {
       param.mutable_layer(layer_id)->set_phase(phase_);
@@ -89,13 +81,7 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
           << "propagate_down param must be specified "
           << "either 0 or bottom_size times ";
     }
-    if (share_from_root) {
-      LOG(INFO) << "Sharing layer " << layer_param.name() << " from root net";
-      layers_.push_back(root_net_->layers_[layer_id]);
-      layers_[layer_id]->SetShared(true);
-    } else {
-      layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
-    }
+    layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
     layer_names_.push_back(layer_param.name());
     LOG_IF(INFO, Caffe::root_solver())
         << "Creating Layer " << layer_param.name();
@@ -134,19 +120,7 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
       }
     }
     // After this layer is connected, set it up.
-    if (share_from_root) {
-      // Set up size of top blobs using root_net_
-      const vector<Blob<Dtype>*>& base_top = root_net_->top_vecs_[layer_id];
-      const vector<Blob<Dtype>*>& this_top = this->top_vecs_[layer_id];
-      for (int top_id = 0; top_id < base_top.size(); ++top_id) {
-        this_top[top_id]->ReshapeLike(*base_top[top_id]);
-        LOG(INFO) << "Created top blob " << top_id << " (shape: "
-            << this_top[top_id]->shape_string() <<  ") for shared layer "
-            << layer_param.name();
-      }
-    } else {
-      layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
-    }
+    layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
     LOG_IF(INFO, Caffe::root_solver())
         << "Setting up " << layer_names_[layer_id];
     for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) {
@@ -546,10 +520,15 @@ Dtype Net<Dtype>::ForwardFromTo(int start, int end) {
   CHECK_LT(end, layers_.size());
   Dtype loss = 0;
   for (int i = start; i <= end; ++i) {
-    // LOG(ERROR) << "Forwarding " << layer_names_[i];
+    for (int c = 0; c < before_forward_.size(); ++c) {
+      before_forward_[c]->run(i);
+    }
     Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]);
     loss += layer_loss;
     if (debug_info_) { ForwardDebugInfo(i); }
+    for (int c = 0; c < after_forward_.size(); ++c) {
+      after_forward_[c]->run(i);
+    }
   }
   return loss;
 }
@@ -591,11 +570,17 @@ void Net<Dtype>::BackwardFromTo(int start, int end) {
   CHECK_GE(end, 0);
   CHECK_LT(start, layers_.size());
   for (int i = start; i >= end; --i) {
+    for (int c = 0; c < before_backward_.size(); ++c) {
+      before_backward_[c]->run(i);
+    }
     if (layer_need_backward_[i]) {
       layers_[i]->Backward(
           top_vecs_[i], bottom_need_backward_[i], bottom_vecs_[i]);
       if (debug_info_) { BackwardDebugInfo(i); }
     }
+    for (int c = 0; c < after_backward_.size(); ++c) {
+      after_backward_[c]->run(i);
+    }
   }
 }
 
index 5bc41c6..d943391 100644 (file)
@@ -1,16 +1,15 @@
-#ifndef CPU_ONLY
+#ifdef USE_NCCL
+
 #include <cuda_runtime.h>
-#endif
 #include <glog/logging.h>
 #include <stdio.h>
-
 #include <sstream>
 #include <string>
 #include <vector>
 
-#include "boost/thread.hpp"
 #include "caffe/caffe.hpp"
 #include "caffe/parallel.hpp"
+#include "caffe/sgd_solvers.hpp"
 
 namespace caffe {
 
@@ -68,15 +67,14 @@ static size_t total_size(const vector<Blob<Dtype>*>& params) {
 
 template<typename Dtype>
 Params<Dtype>::Params(shared_ptr<Solver<Dtype> > root_solver)
-    : size_(total_size<Dtype>(root_solver->net()->learnable_params())),
-      data_(),
-      diff_() {
+  : size_(total_size<Dtype>(root_solver->net()->learnable_params())),
+    data_(),
+    diff_() {
 }
 
 template<typename Dtype>
 GPUParams<Dtype>::GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device)
-    : Params<Dtype>(root_solver) {
-#ifndef CPU_ONLY
+  : Params<Dtype>(root_solver) {
   int initial_device;
   CUDA_CHECK(cudaGetDevice(&initial_device));
 
@@ -86,358 +84,288 @@ GPUParams<Dtype>::GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device)
 
   // Copy blob values
   const vector<Blob<Dtype>*>& net =
-      root_solver->net()->learnable_params();
+    root_solver->net()->learnable_params();
   apply_buffers(net, data_, size_, copy);
 
   CUDA_CHECK(cudaMalloc(&diff_, size_ * sizeof(Dtype)));
   caffe_gpu_set(size_, Dtype(0), diff_);
 
   CUDA_CHECK(cudaSetDevice(initial_device));
-#else
-  NO_GPU;
-#endif
 }
 
 template<typename Dtype>
 GPUParams<Dtype>::~GPUParams() {
-#ifndef CPU_ONLY
   CUDA_CHECK(cudaFree(data_));
   CUDA_CHECK(cudaFree(diff_));
-#endif
 }
 
 template<typename Dtype>
-void GPUParams<Dtype>::configure(Solver<Dtype>* solver) const {
+void GPUParams<Dtype>::Configure(Solver<Dtype>* solver) const {
   const vector<Blob<Dtype>*>& net =
-      solver->net()->learnable_params();
+    solver->net()->learnable_params();
   apply_buffers(net, data_, size_, replace_gpu);
   apply_buffers(net, diff_, size_, replace_gpu_diff);
 }
 
-void DevicePair::compute(const vector<int> devices, vector<DevicePair>* pairs) {
-#ifndef CPU_ONLY
-  vector<int> remaining(devices);
-
-  // Depth for reduction tree
-  int remaining_depth = static_cast<int>(ceil(log2(remaining.size())));
-
-  // Group GPUs by board
-  for (int d = 0; d < remaining_depth; ++d) {
-    for (int i = 0; i < remaining.size(); ++i) {
-      for (int j = i + 1; j < remaining.size(); ++j) {
-        cudaDeviceProp a, b;
-        CUDA_CHECK(cudaGetDeviceProperties(&a, remaining[i]));
-        CUDA_CHECK(cudaGetDeviceProperties(&b, remaining[j]));
-        if (a.isMultiGpuBoard && b.isMultiGpuBoard) {
-          if (a.multiGpuBoardGroupID == b.multiGpuBoardGroupID) {
-            pairs->push_back(DevicePair(remaining[i], remaining[j]));
-            DLOG(INFO) << "GPU board: " << remaining[i] << ":" << remaining[j];
-            remaining.erase(remaining.begin() + j);
-            break;
-          }
-        }
-      }
-    }
-  }
-  ostringstream s;
-  for (int i = 0; i < remaining.size(); ++i) {
-    s << (i ? ", " : "") << remaining[i];
-  }
-  DLOG(INFO) << "GPUs paired by boards, remaining: " << s.str();
-
-  // Group by P2P accessibility
-  remaining_depth = ceil(log2(remaining.size()));
-  for (int d = 0; d < remaining_depth; ++d) {
-    for (int i = 0; i < remaining.size(); ++i) {
-      for (int j = i + 1; j < remaining.size(); ++j) {
-        int access;
-        CUDA_CHECK(
-            cudaDeviceCanAccessPeer(&access, remaining[i], remaining[j]));
-        if (access) {
-          pairs->push_back(DevicePair(remaining[i], remaining[j]));
-          DLOG(INFO) << "P2P pair: " << remaining[i] << ":" << remaining[j];
-          remaining.erase(remaining.begin() + j);
-          break;
-        }
-      }
-    }
-  }
-  s.str("");
-  for (int i = 0; i < remaining.size(); ++i) {
-    s << (i ? ", " : "") << remaining[i];
-  }
-  DLOG(INFO) << "GPUs paired by P2P access, remaining: " << s.str();
-
-  // Group remaining
-  remaining_depth = ceil(log2(remaining.size()));
-  for (int d = 0; d < remaining_depth; ++d) {
-    for (int i = 0; i < remaining.size(); ++i) {
-      pairs->push_back(DevicePair(remaining[i], remaining[i + 1]));
-      DLOG(INFO) << "Remaining pair: " << remaining[i] << ":"
-                 << remaining[i + 1];
-      remaining.erase(remaining.begin() + i + 1);
-    }
-  }
+static int getDevice() {
+  int device = 0;
+  CUDA_CHECK(cudaGetDevice(&device));
+  return device;
+}
 
-  // Should only be the parent node remaining
-  CHECK_EQ(remaining.size(), 1);
+template<typename Dtype>
+NCCL<Dtype>::NCCL(shared_ptr<Solver<Dtype> > solver)
+  : GPUParams<Dtype>(solver, getDevice()),
+    comm_(), solver_(solver), barrier_() {
+  this->Configure(solver.get());
+  Init();
+}
 
-  pairs->insert(pairs->begin(), DevicePair(-1, remaining[0]));
+template<typename Dtype>
+NCCL<Dtype>::NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid)
+  : GPUParams<Dtype>(solver, getDevice()),
+    solver_(solver), barrier_() {
+  this->Configure(solver.get());
+  Caffe::set_multiprocess(true);
+  ncclUniqueId nccl_uid;
+  memcpy(&nccl_uid, &uid[0], NCCL_UNIQUE_ID_BYTES);  // NOLINT(caffe/alt_fn)
+  NCCL_CHECK(ncclCommInitRank(&comm_,
+                              Caffe::solver_count(),
+                              nccl_uid,
+                              Caffe::solver_rank()));
+  Init();
+}
 
-  CHECK(pairs->size() == devices.size());
-  for (int i = 0; i < pairs->size(); ++i) {
-    CHECK((*pairs)[i].parent() != (*pairs)[i].device());
-    for (int j = i + 1; j < pairs->size(); ++j) {
-      CHECK((*pairs)[i].device() != (*pairs)[j].device());
-    }
+template<typename Dtype>
+void NCCL<Dtype>::Init() {
+  if (solver_->param().layer_wise_reduce()) {
+    CUDA_CHECK(cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking));
   }
-#else
-  NO_GPU;
-#endif
 }
 
-//
-
 template<typename Dtype>
-P2PSync<Dtype>::P2PSync(shared_ptr<Solver<Dtype> > root_solver,
-                        P2PSync<Dtype>* parent, const SolverParameter& param)
-    : GPUParams<Dtype>(root_solver, param.device_id()),
-      parent_(parent),
-      children_(),
-      queue_(),
-      initial_iter_(root_solver->iter()),
-      solver_() {
-#ifndef CPU_ONLY
-  int initial_device;
-  CUDA_CHECK(cudaGetDevice(&initial_device));
-  const int self = param.device_id();
-  CUDA_CHECK(cudaSetDevice(self));
-
-  if (parent == NULL) {
-    solver_ = root_solver;
-  } else {
-    Caffe::set_root_solver(false);
-    solver_.reset(new WorkerSolver<Dtype>(param, root_solver.get()));
-    Caffe::set_root_solver(true);
+NCCL<Dtype>::~NCCL() {
+  if (solver_->param().layer_wise_reduce()) {
+    CUDA_CHECK(cudaStreamDestroy(stream_));
   }
-  this->configure(solver_.get());
-  solver_->add_callback(this);
-
-  if (parent) {
-    // Enable p2p access between devices
-    const int peer = parent->solver_->param().device_id();
-    int access;
-    CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer));
-    if (access) {
-      CUDA_CHECK(cudaDeviceEnablePeerAccess(peer, 0));
-    } else {
-      LOG(INFO)<< "GPU " << self << " does not have p2p access to GPU " << peer;
-    }
-    // Allocate receiving buffer on parent
-    CUDA_CHECK(cudaSetDevice(peer));
-    CUDA_CHECK(cudaMalloc(&parent_grads_, size_ * sizeof(Dtype)));
-    CUDA_CHECK(cudaSetDevice(self));
+  if (comm_) {
+    ncclCommDestroy(comm_);
   }
-
-  CUDA_CHECK(cudaSetDevice(initial_device));
-#else
-  NO_GPU;
-#endif
 }
 
 template<typename Dtype>
-P2PSync<Dtype>::~P2PSync() {
-#ifndef CPU_ONLY
-  int initial_device;
-  CUDA_CHECK(cudaGetDevice(&initial_device));
-  const int self = solver_->param().device_id();
-  CUDA_CHECK(cudaSetDevice(self));
-
-  if (parent_) {
-    CUDA_CHECK(cudaFree(parent_grads_));
-    const int peer = parent_->solver_->param().device_id();
-    int access;
-    CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer));
-    if (access) {
-      CUDA_CHECK(cudaDeviceDisablePeerAccess(peer));
-    }
-  }
-
-  CUDA_CHECK(cudaSetDevice(initial_device));
-#endif
+boost::barrier* NCCL<Dtype>::barrier() {
+  return barrier_;
+}
+template<typename Dtype>
+void NCCL<Dtype>::set_barrier(boost::barrier* value) {
+  barrier_ = value;
 }
 
 template<typename Dtype>
-void P2PSync<Dtype>::InternalThreadEntry() {
-  Caffe::SetDevice(solver_->param().device_id());
-  CHECK(Caffe::root_solver());
-  Caffe::set_root_solver(false);
-  // See if there is a defined seed and reset random state if so
-  if (solver_->param().random_seed() >= 0) {
-    // Fetch random seed and modulate by device ID to make sure
-    // everyone doesn't have the same seed.  We seem to have some
-    // solver instability if we have everyone with the same seed
-    Caffe::set_random_seed(
-        solver_->param().random_seed() + solver_->param().device_id());
+void NCCL<Dtype>::InitSingleProcess(vector<NCCL<Dtype>*>* nccls) {
+  ncclComm_t* comms = new ncclComm_t[nccls->size()];
+  int* gpu_list = new int[nccls->size()];
+  for (int i = 0; i < nccls->size(); ++i) {
+    gpu_list[i] = (*nccls)[i]->solver_->param().device_id();
+  }
+  NCCL_CHECK(ncclCommInitAll(comms, static_cast<int>(nccls->size()), gpu_list));
+  for (int i = 0; i < nccls->size(); ++i) {
+    (*nccls)[i]->comm_ = comms[i];
   }
-  solver_->Step(solver_->param().max_iter() - initial_iter_);
 }
 
 template<typename Dtype>
-void P2PSync<Dtype>::on_start() {
-#ifndef CPU_ONLY
-#ifdef DEBUG
-  int device;
-  CUDA_CHECK(cudaGetDevice(&device));
-  CHECK(device == solver_->param().device_id());
-#else
-//  CHECK(false);
-#endif
+string NCCL<Dtype>::new_uid() {
+  string uid;
+  uid.resize(NCCL_UNIQUE_ID_BYTES);
+  ncclUniqueId nccl_uid;
+  NCCL_CHECK(ncclGetUniqueId(&nccl_uid));
+  memcpy(&uid[0], &nccl_uid, NCCL_UNIQUE_ID_BYTES);  // NOLINT(caffe/alt_fn)
+  return uid;
+}
 
-  // Wait for update from parent
-  if (parent_) {
-    P2PSync<Dtype> *parent = queue_.pop();
-    CHECK(parent == parent_);
+template<typename Dtype>
+void NCCL<Dtype>::Broadcast() {
+  if (barrier_) {  // NULL in multi process case
+    barrier_->wait();
   }
-
-  // Update children
-  for (int i = children_.size() - 1; i >= 0; i--) {
-    Dtype* src = data_;
-    Dtype* dst = children_[i]->data_;
-
-#ifdef DEBUG
-    cudaPointerAttributes attributes;
-    CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
-    CHECK(attributes.device == device);
-    CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
-    CHECK(attributes.device == children_[i]->solver_->param().device_id());
-#endif
-
-    CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype),
-        cudaMemcpyDeviceToDevice, cudaStreamDefault));
-    CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
-    children_[i]->queue_.push(this);
+  NCCL_CHECK(ncclBcast(data_, static_cast<int>(size_),
+                       nccl::dataType<Dtype>::type, 0,
+                       comm_, cudaStreamDefault));
+  if (barrier_) {
+    barrier_->wait();
   }
-#endif
 }
 
 template<typename Dtype>
-void P2PSync<Dtype>::on_gradients_ready() {
-#ifndef CPU_ONLY
+void NCCL<Dtype>::run(int layer) {
+  CHECK(solver_->param().layer_wise_reduce());
+  vector<shared_ptr<Blob<Dtype> > >& blobs =
+    solver_->net()->layers()[layer]->blobs();
 #ifdef DEBUG
-  int device;
-  CUDA_CHECK(cudaGetDevice(&device));
-  CHECK(device == solver_->param().device_id());
+  // Assert blobs are contiguous to reduce in one step (e.g. bias often small)
+  for (int i = 1; i < blobs.size(); ++i) {
+    CHECK_EQ(blobs[i - 1]->gpu_diff() + blobs[i - 1]->count(),
+             blobs[i + 0]->gpu_diff());
+  }
 #endif
+  if (blobs.size() > 0) {
+    // Make sure default stream is done computing gradients. Could be
+    // replaced by cudaEventRecord+cudaStreamWaitEvent to avoid
+    // blocking the default stream, but it's actually slower.
+    CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
 
-  // Sum children gradients as they appear in the queue
-  for (int i = 0; i < children_.size(); ++i) {
-    P2PSync<Dtype> *child = queue_.pop();
-    Dtype* src = child->parent_grads_;
-    Dtype* dst = diff_;
-
-#ifdef DEBUG
-    bool ok = false;
-    for (int j = 0; j < children_.size(); ++j) {
-      if (child == children_[j]) {
-        ok = true;
-      }
+    // Reduce asynchronously
+    int size = 0;
+    for (int i = 0; i < blobs.size(); ++i) {
+      size += blobs[i]->count();
     }
-    CHECK(ok);
-    cudaPointerAttributes attributes;
-    CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
-    CHECK(attributes.device == device);
-    CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
-    CHECK(attributes.device == device);
-#endif
-
-    caffe_gpu_add(size_, src, dst, dst);
+    if (barrier_) {  // NULL in multi process case
+      barrier_->wait();
+    }
+    NCCL_CHECK(ncclAllReduce(blobs[0]->mutable_gpu_diff(),
+                             blobs[0]->mutable_gpu_diff(),
+                             size,
+                             nccl::dataType<Dtype>::type,
+                             ncclSum, comm_, stream_));
+    caffe_gpu_scal(size, (Dtype) 1.0 / Caffe::solver_count(),
+                   blobs[0]->mutable_gpu_diff(), stream_);
   }
+}
 
-  // Send gradients to parent
-  if (parent_) {
-    Dtype* src = diff_;
-    Dtype* dst = parent_grads_;
-
-#ifdef DEBUG
-    cudaPointerAttributes attributes;
-    CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
-    CHECK(attributes.device == device);
-    CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
-    CHECK(attributes.device == parent_->solver_->param().device_id());
-#endif
-
-    CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype),  //
-        cudaMemcpyDeviceToDevice, cudaStreamDefault));
-    CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
-    parent_->queue_.push(this);
+template<typename Dtype>
+void NCCL<Dtype>::on_gradients_ready() {
+  if (solver_->param().layer_wise_reduce()) {
+    CHECK_EQ(solver_->net()->params().size(),
+             solver_->net()->learnable_params().size())
+      << "Layer-wise reduce is not supported for nets with shared weights.";
+
+    // Make sure reduction is done before applying gradients
+    CUDA_CHECK(cudaStreamSynchronize(stream_));
   } else {
-    // Loss functions divide gradients by the batch size, so to compensate
-    // for split batch, the root solver divides by number of solvers.
-    caffe_gpu_scal(size_, Dtype(1.0 / Caffe::solver_count()), diff_);
+    if (barrier_) {  // NULL in multi process case
+      barrier_->wait();
+    }
+    NCCL_CHECK(ncclAllReduce(diff_, diff_, static_cast<int>(size_),
+                             nccl::dataType<Dtype>::type, ncclSum, comm_,
+                             cudaStreamDefault));
+    caffe_gpu_scal(static_cast<int>(size_),
+                   (Dtype) 1.0 / Caffe::solver_count(), diff_);
   }
-#endif
 }
 
 template<typename Dtype>
-void P2PSync<Dtype>::Prepare(const vector<int>& gpus,
-            vector<shared_ptr<P2PSync<Dtype> > >* syncs) {
-  // Pair devices for map-reduce synchronization
-  vector<DevicePair> pairs;
-  DevicePair::compute(gpus, &pairs);
-  ostringstream s;
-  for (int i = 1; i < pairs.size(); ++i) {
-    s << (i == 1 ? "" : ", ") << pairs[i].parent() << ":" << pairs[i].device();
+class Worker : public InternalThread {
+ public:
+  explicit Worker(shared_ptr<Solver<Dtype> > rank0, int device,
+                  boost::barrier* barrier, vector<NCCL<Dtype>*>* nccls,
+                  const char* restore)
+    : rank0_(rank0), device_(device), barrier_(barrier),
+      nccls_(nccls), restore_(restore) {
   }
-  LOG(INFO)<< "GPUs pairs " << s.str();
-
-  SolverParameter param(solver_->param());
-
-  // Build the GPU tree by finding the parent for each solver
-  for (int attempts = 0; attempts < pairs.size(); ++attempts) {
-    for (int i = 1; i < pairs.size(); ++i) {
-      if (!syncs->at(i).get()) {
-        P2PSync<Dtype>* parent = NULL;
-        for (int j = 0; j < syncs->size(); ++j) {
-          P2PSync<Dtype>* sync = j == 0 ? this : syncs->at(j).get();
-          if (sync) {
-            const SolverParameter& p = sync->solver()->param();
-            if (p.device_id() == pairs[i].parent()) {
-              parent = sync;
-            }
-          }
-        }
-        if (parent) {
-          param.set_device_id(pairs[i].device());
-          syncs->at(i).reset(new P2PSync<Dtype>(solver_, parent, param));
-          parent->children_.push_back((P2PSync<Dtype>*) syncs->at(i).get());
-        }
+  virtual ~Worker() {}
+
+ protected:
+  void InternalThreadEntry() {
+    // Create solver and install callbacks
+    SolverParameter param(rank0_->param());
+    param.set_device_id(device_);
+#ifdef DEBUG
+    int device;
+    CUDA_CHECK(cudaGetDevice(&device));
+    CHECK_EQ(device, device_);
+#endif
+    param.set_type(rank0_->type());
+    shared_ptr<Solver<Dtype> > s(SolverRegistry<Dtype>::CreateSolver(param));
+    CHECK_EQ(s->type(), rank0_->type());
+    if (restore_) {
+      // Could not make NCCL broadcast solver state, it seems to crash
+      // if called in a tight loop, regardless of barriers etc. so
+      // restore all solvers from file.
+      s->Restore(restore_);
+    }
+    NCCL<Dtype> nccl(s);
+    nccl.set_barrier(barrier_);
+    s->add_callback(&nccl);
+    if (s->param().layer_wise_reduce()) {
+      s->net()->add_after_backward(&nccl);
+    }
+    (*nccls_)[Caffe::solver_rank()] = &nccl;
+    // Wait for other threads
+    barrier_->wait();
+    // Wait for NCCL init
+    barrier_->wait();
+    // Broadcast rank 0 state
+    nccl.Broadcast();
+    // Solve
+    s->Step(param.max_iter() - s->iter());
+    barrier_->wait();
+#ifdef DEBUG
+    // Check all solvers have same state
+    SGDSolver<Dtype>* sa = static_cast<SGDSolver<Dtype>*>(rank0_.get());
+    SGDSolver<Dtype>* sb = static_cast<SGDSolver<Dtype>*>(s.get());
+    for (int h = 0; h < sa->history().size(); ++h) {
+      CUDA_CHECK(cudaSetDevice(sa->param().device_id()));
+      const Dtype* a = sa->history()[h]->cpu_data();
+      CUDA_CHECK(cudaSetDevice(sb->param().device_id()));
+      const Dtype* b = sb->history()[h]->cpu_data();
+      for (int v = 0; v < sa->history()[h]->count(); ++v) {
+        CHECK_DOUBLE_EQ(a[v], b[v]);
       }
     }
+#endif
   }
-}
-
-template<typename Dtype>
-void P2PSync<Dtype>::Run(const vector<int>& gpus) {
-  vector<shared_ptr<P2PSync<Dtype> > > syncs(gpus.size());
-  Prepare(gpus, &syncs);
 
-  LOG(INFO)<< "Starting Optimization";
+  shared_ptr<Solver<Dtype> > rank0_;
+  int device_;
+  boost::barrier* barrier_;
+  vector<NCCL<Dtype>*>* nccls_;
+  const char* restore_;
+};
 
-  for (int i = 1; i < syncs.size(); ++i) {
-    syncs[i]->StartInternalThread();
+template<typename Dtype>
+void NCCL<Dtype>::Run(const vector<int>& gpus, const char* restore) {
+  boost::barrier barrier(static_cast<int>(gpus.size()));
+  vector<NCCL<Dtype>*> nccls(gpus.size());
+  // Create workers
+  vector<shared_ptr<Worker<Dtype> > > workers(gpus.size());
+  for (int i = 1; i < gpus.size(); ++i) {
+    CUDA_CHECK(cudaSetDevice(gpus[i]));
+    Caffe::set_solver_rank(i);
+    Worker<Dtype>* w = new Worker<Dtype>(solver_, gpus[i], &barrier,
+                                         &nccls, restore);
+    w->StartInternalThread();
+    workers[i].reset(w);
   }
-
-  // Run root solver on current thread
+  CUDA_CHECK(cudaSetDevice(gpus[0]));
+  Caffe::set_solver_rank(0);
+  barrier_ = &barrier;
+  solver_->add_callback(this);
+  if (solver_->param().layer_wise_reduce()) {
+    solver_->net()->add_after_backward(this);
+  }
+  nccls[0] = this;
+  // Wait for workers
+  barrier.wait();
+  // Init NCCL
+  InitSingleProcess(&nccls);
+  barrier.wait();
+  // Run first solver on current thread
+  Broadcast();
   solver_->Solve();
-
-  for (int i = 1; i < syncs.size(); ++i) {
-    syncs[i]->StopInternalThread();
+  barrier.wait();  // Hangs without it when running tests
+  // Wait for shutdown
+  for (int i = 1; i < gpus.size(); ++i) {
+    workers[i]->StopInternalThread();
   }
 }
 
 INSTANTIATE_CLASS(Params);
 INSTANTIATE_CLASS(GPUParams);
-INSTANTIATE_CLASS(P2PSync);
+INSTANTIATE_CLASS(Worker);
+INSTANTIATE_CLASS(NCCL);
 
 }  // namespace caffe
+
+#endif  // USE_NCCL
index 430a0de..1c85f69 100644 (file)
@@ -98,7 +98,7 @@ message NetParameter {
 // NOTE
 // Update the next available ID when you add a new SolverParameter field.
 //
-// SolverParameter next available ID: 41 (last added: type)
+// SolverParameter next available ID: 42 (last added: layer_wise_reduce)
 message SolverParameter {
   //////////////////////////////////////////////////////////////////////////////
   // Specifying the train and test networks
@@ -239,6 +239,9 @@ message SolverParameter {
   }
   // DEPRECATED: use type instead of solver_type
   optional SolverType solver_type = 30 [default = SGD];
+
+  // Overlap compute and communication for data parallel training
+  optional bool layer_wise_reduce = 41 [default = true];
 }
 
 // A message that stores the solver snapshots
@@ -655,8 +658,8 @@ message DataParameter {
   optional bool mirror = 6 [default = false];
   // Force the encoded image to have 3 color channels
   optional bool force_encoded_color = 9 [default = false];
-  // Prefetch queue (Number of batches to prefetch to host memory, increase if
-  // data access bandwidth varies).
+  // Prefetch queue (Increase if data feeding bandwidth varies, within the
+  // limit of device memory for GPU training)
   optional uint32 prefetch = 10 [default = 4];
 }
 
index ece3913..1c1a9e5 100644 (file)
@@ -26,16 +26,14 @@ SolverAction::Enum Solver<Dtype>::GetRequestedAction() {
 }
 
 template <typename Dtype>
-Solver<Dtype>::Solver(const SolverParameter& param, const Solver* root_solver)
-    : net_(), callbacks_(), root_solver_(root_solver),
-      requested_early_exit_(false) {
+Solver<Dtype>::Solver(const SolverParameter& param)
+    : net_(), callbacks_(), requested_early_exit_(false) {
   Init(param);
 }
 
 template <typename Dtype>
-Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver)
-    : net_(), callbacks_(), root_solver_(root_solver),
-      requested_early_exit_(false) {
+Solver<Dtype>::Solver(const string& param_file)
+    : net_(), callbacks_(), requested_early_exit_(false) {
   SolverParameter param;
   ReadSolverParamsFromTextFileOrDie(param_file, &param);
   Init(param);
@@ -43,15 +41,13 @@ Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver)
 
 template <typename Dtype>
 void Solver<Dtype>::Init(const SolverParameter& param) {
-  CHECK(Caffe::root_solver() || root_solver_)
-      << "root_solver_ needs to be set for all non-root solvers";
   LOG_IF(INFO, Caffe::root_solver()) << "Initializing solver from parameters: "
     << std::endl << param.DebugString();
   param_ = param;
   CHECK_GE(param_.average_loss(), 1) << "average_loss should be non-negative.";
   CheckSnapshotWritePermissions();
-  if (Caffe::root_solver() && param_.random_seed() >= 0) {
-    Caffe::set_random_seed(param_.random_seed());
+  if (param_.random_seed() >= 0) {
+    Caffe::set_random_seed(param_.random_seed() + Caffe::solver_rank());
   }
   // Scaffolding code
   InitTrainNet();
@@ -101,11 +97,7 @@ void Solver<Dtype>::InitTrainNet() {
   net_state.MergeFrom(net_param.state());
   net_state.MergeFrom(param_.train_state());
   net_param.mutable_state()->CopyFrom(net_state);
-  if (Caffe::root_solver()) {
-    net_.reset(new Net<Dtype>(net_param));
-  } else {
-    net_.reset(new Net<Dtype>(net_param, root_solver_->net_.get()));
-  }
+  net_.reset(new Net<Dtype>(net_param));
 }
 
 template <typename Dtype>
@@ -180,12 +172,7 @@ void Solver<Dtype>::InitTestNets() {
     net_params[i].mutable_state()->CopyFrom(net_state);
     LOG(INFO)
         << "Creating test net (#" << i << ") specified by " << sources[i];
-    if (Caffe::root_solver()) {
-      test_nets_[i].reset(new Net<Dtype>(net_params[i]));
-    } else {
-      test_nets_[i].reset(new Net<Dtype>(net_params[i],
-          root_solver_->test_nets_[i].get()));
-    }
+    test_nets_[i].reset(new Net<Dtype>(net_params[i]));
     test_nets_[i]->set_debug_info(param_.debug_info());
   }
 }
@@ -197,14 +184,16 @@ void Solver<Dtype>::Step(int iters) {
   int average_loss = this->param_.average_loss();
   losses_.clear();
   smoothed_loss_ = 0;
+  iteration_timer_.Start();
 
   while (iter_ < stop_iter) {
     // zero-init the params
     net_->ClearParamDiffs();
     if (param_.test_interval() && iter_ % param_.test_interval() == 0
-        && (iter_ > 0 || param_.test_initialization())
-        && Caffe::root_solver()) {
-      TestAll();
+        && (iter_ > 0 || param_.test_initialization())) {
+      if (Caffe::root_solver()) {
+        TestAll();
+      }
       if (requested_early_exit_) {
         // Break out of the while loop because stop was requested while testing.
         break;
@@ -225,8 +214,13 @@ void Solver<Dtype>::Step(int iters) {
     // average the loss across iterations for smoothed reporting
     UpdateSmoothedLoss(loss, start_iter, average_loss);
     if (display) {
+      float lapse = iteration_timer_.Seconds();
+      float per_s = (iter_ - iterations_last_) / (lapse ? lapse : 1);
       LOG_IF(INFO, Caffe::root_solver()) << "Iteration " << iter_
-          << ", loss = " << smoothed_loss_;
+          << " (" << per_s << " iter/s, " << lapse << "s/"
+          << param_.display() << " iters), loss = " << smoothed_loss_;
+      iteration_timer_.Start();
+      iterations_last_ = iter_;
       const vector<Blob<Dtype>*>& result = net_->output_blobs();
       int score_index = 0;
       for (int j = 0; j < result.size(); ++j) {
index e78eadc..d8107e1 100644 (file)
@@ -12,7 +12,6 @@ void adagrad_update_gpu(int N, Dtype* g, Dtype* h, Dtype delta,
 
 template <typename Dtype>
 void AdaGradSolver<Dtype>::ComputeUpdateValue(int param_id, Dtype rate) {
-  CHECK(Caffe::root_solver());
   const vector<Blob<Dtype>*>& net_params = this->net_->learnable_params();
   const vector<float>& net_params_lr = this->net_->params_lr();
   Dtype delta = this->param_.delta();
index 23ab2d4..7c1fac1 100644 (file)
@@ -12,7 +12,6 @@ void nesterov_update_gpu(int N, Dtype* g, Dtype* h, Dtype momentum,
 
 template <typename Dtype>
 void NesterovSolver<Dtype>::ComputeUpdateValue(int param_id, Dtype rate) {
-  CHECK(Caffe::root_solver());
   const vector<Blob<Dtype>*>& net_params = this->net_->learnable_params();
   const vector<float>& net_params_lr = this->net_->params_lr();
   Dtype momentum = this->param_.momentum();
index f30f316..ad6abe5 100644 (file)
@@ -100,10 +100,10 @@ void SGDSolver<Dtype>::ClipGradients() {
 
 template <typename Dtype>
 void SGDSolver<Dtype>::ApplyUpdate() {
-  CHECK(Caffe::root_solver());
   Dtype rate = GetLearningRate();
   if (this->param_.display() && this->iter_ % this->param_.display() == 0) {
-    LOG(INFO) << "Iteration " << this->iter_ << ", lr = " << rate;
+    LOG_IF(INFO, Caffe::root_solver()) << "Iteration " << this->iter_
+        << ", lr = " << rate;
   }
   ClipGradients();
   for (int param_id = 0; param_id < this->net_->learnable_params().size();
index 4d35641..88d9b78 100644 (file)
@@ -3,26 +3,41 @@
 #include "caffe/util/math_functions.hpp"
 
 namespace caffe {
+SyncedMemory::SyncedMemory()
+  : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED),
+    own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false) {
+#ifndef CPU_ONLY
+#ifdef DEBUG
+  CUDA_CHECK(cudaGetDevice(&device_));
+#endif
+#endif
+}
+
+SyncedMemory::SyncedMemory(size_t size)
+  : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED),
+    own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false) {
+#ifndef CPU_ONLY
+#ifdef DEBUG
+  CUDA_CHECK(cudaGetDevice(&device_));
+#endif
+#endif
+}
 
 SyncedMemory::~SyncedMemory() {
+  check_device();
   if (cpu_ptr_ && own_cpu_data_) {
     CaffeFreeHost(cpu_ptr_, cpu_malloc_use_cuda_);
   }
 
 #ifndef CPU_ONLY
   if (gpu_ptr_ && own_gpu_data_) {
-    int initial_device;
-    cudaGetDevice(&initial_device);
-    if (gpu_device_ != -1) {
-      CUDA_CHECK(cudaSetDevice(gpu_device_));
-    }
     CUDA_CHECK(cudaFree(gpu_ptr_));
-    cudaSetDevice(initial_device);
   }
 #endif  // CPU_ONLY
 }
 
 inline void SyncedMemory::to_cpu() {
+  check_device();
   switch (head_) {
   case UNINITIALIZED:
     CaffeMallocHost(&cpu_ptr_, size_, &cpu_malloc_use_cuda_);
@@ -49,10 +64,10 @@ inline void SyncedMemory::to_cpu() {
 }
 
 inline void SyncedMemory::to_gpu() {
+  check_device();
 #ifndef CPU_ONLY
   switch (head_) {
   case UNINITIALIZED:
-    CUDA_CHECK(cudaGetDevice(&gpu_device_));
     CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
     caffe_gpu_memset(size_, 0, gpu_ptr_);
     head_ = HEAD_AT_GPU;
@@ -60,7 +75,6 @@ inline void SyncedMemory::to_gpu() {
     break;
   case HEAD_AT_CPU:
     if (gpu_ptr_ == NULL) {
-      CUDA_CHECK(cudaGetDevice(&gpu_device_));
       CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
       own_gpu_data_ = true;
     }
@@ -77,11 +91,13 @@ inline void SyncedMemory::to_gpu() {
 }
 
 const void* SyncedMemory::cpu_data() {
+  check_device();
   to_cpu();
   return (const void*)cpu_ptr_;
 }
 
 void SyncedMemory::set_cpu_data(void* data) {
+  check_device();
   CHECK(data);
   if (own_cpu_data_) {
     CaffeFreeHost(cpu_ptr_, cpu_malloc_use_cuda_);
@@ -92,6 +108,7 @@ void SyncedMemory::set_cpu_data(void* data) {
 }
 
 const void* SyncedMemory::gpu_data() {
+  check_device();
 #ifndef CPU_ONLY
   to_gpu();
   return (const void*)gpu_ptr_;
@@ -102,16 +119,11 @@ const void* SyncedMemory::gpu_data() {
 }
 
 void SyncedMemory::set_gpu_data(void* data) {
+  check_device();
 #ifndef CPU_ONLY
   CHECK(data);
   if (own_gpu_data_) {
-    int initial_device;
-    cudaGetDevice(&initial_device);
-    if (gpu_device_ != -1) {
-      CUDA_CHECK(cudaSetDevice(gpu_device_));
-    }
     CUDA_CHECK(cudaFree(gpu_ptr_));
-    cudaSetDevice(initial_device);
   }
   gpu_ptr_ = data;
   head_ = HEAD_AT_GPU;
@@ -122,12 +134,14 @@ void SyncedMemory::set_gpu_data(void* data) {
 }
 
 void* SyncedMemory::mutable_cpu_data() {
+  check_device();
   to_cpu();
   head_ = HEAD_AT_CPU;
   return cpu_ptr_;
 }
 
 void* SyncedMemory::mutable_gpu_data() {
+  check_device();
 #ifndef CPU_ONLY
   to_gpu();
   head_ = HEAD_AT_GPU;
@@ -140,9 +154,9 @@ void* SyncedMemory::mutable_gpu_data() {
 
 #ifndef CPU_ONLY
 void SyncedMemory::async_gpu_push(const cudaStream_t& stream) {
+  check_device();
   CHECK(head_ == HEAD_AT_CPU);
   if (gpu_ptr_ == NULL) {
-    CUDA_CHECK(cudaGetDevice(&gpu_device_));
     CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
     own_gpu_data_ = true;
   }
@@ -153,5 +167,20 @@ void SyncedMemory::async_gpu_push(const cudaStream_t& stream) {
 }
 #endif
 
+void SyncedMemory::check_device() {
+#ifndef CPU_ONLY
+#ifdef DEBUG
+  int device;
+  cudaGetDevice(&device);
+  CHECK(device == device_);
+  if (gpu_ptr_ && own_gpu_data_) {
+    cudaPointerAttributes attributes;
+    CUDA_CHECK(cudaPointerGetAttributes(&attributes, gpu_ptr_));
+    CHECK(attributes.device == device_);
+  }
+#endif
+#endif
+}
+
 }  // namespace caffe
 
index 3e8d113..3835af1 100644 (file)
@@ -105,6 +105,32 @@ class DataLayerTest : public MultiDeviceTest<TypeParam> {
     }
   }
 
+  void TestSkip() {
+    LayerParameter param;
+    param.set_phase(TRAIN);
+    DataParameter* data_param = param.mutable_data_param();
+    int batch_size = 5;
+    data_param->set_batch_size(batch_size);
+    data_param->set_source(filename_->c_str());
+    data_param->set_backend(backend_);
+    Caffe::set_solver_count(8);
+    for (int dev = 0; dev < Caffe::solver_count(); ++dev) {
+      Caffe::set_solver_rank(dev);
+      DataLayer<Dtype> layer(param);
+      layer.SetUp(blob_bottom_vec_, blob_top_vec_);
+      int label = dev;
+      for (int iter = 0; iter < 10; ++iter) {
+        layer.Forward(blob_bottom_vec_, blob_top_vec_);
+        for (int i = 0; i < batch_size; ++i) {
+          EXPECT_EQ(label % batch_size, blob_top_label_->cpu_data()[i]);
+          label += Caffe::solver_count();
+        }
+      }
+    }
+    Caffe::set_solver_count(1);
+    Caffe::set_solver_rank(0);
+  }
+
   void TestReshape(DataParameter_DB backend) {
     const int num_inputs = 5;
     // Save data of varying shapes.
@@ -356,6 +382,11 @@ TYPED_TEST(DataLayerTest, TestReadLevelDB) {
   this->TestRead();
 }
 
+TYPED_TEST(DataLayerTest, TestSkipLevelDB) {
+  this->Fill(false, DataParameter_DB_LEVELDB);
+  this->TestSkip();
+}
+
 TYPED_TEST(DataLayerTest, TestReshapeLevelDB) {
   this->TestReshape(DataParameter_DB_LEVELDB);
 }
@@ -396,6 +427,11 @@ TYPED_TEST(DataLayerTest, TestReadLMDB) {
   this->TestRead();
 }
 
+TYPED_TEST(DataLayerTest, TestSkipLMDB) {
+  this->Fill(false, DataParameter_DB_LMDB);
+  this->TestSkip();
+}
+
 TYPED_TEST(DataLayerTest, TestReshapeLMDB) {
   this->TestReshape(DataParameter_DB_LMDB);
 }
index 975a8f0..6ad0d8f 100644 (file)
@@ -36,7 +36,9 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
 
   string snapshot_prefix_;
   shared_ptr<SGDSolver<Dtype> > solver_;
-  shared_ptr<P2PSync<Dtype> > sync_;
+#ifdef USE_NCCL
+  shared_ptr<NCCL<Dtype> > nccl_;
+#endif
   int seed_;
   // Dimensions are determined by generate_sample_data.py
   // TODO this is brittle and the hdf5 file should be checked instead.
@@ -85,6 +87,7 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
        "lr_policy: 'fixed' "
        "iter_size: " << iter_size << " "
        "device_id: " << device_id << " "
+       "layer_wise_reduce: " << (!share_) << " "
        "net_param { "
        "  name: 'TestNetwork' "
        "  layer { "
@@ -183,7 +186,7 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
     }
     Caffe::set_random_seed(this->seed_);
     this->InitSolverFromProtoString(proto.str());
-    if (from_snapshot != NULL) {
+    if (from_snapshot) {
       this->solver_->Restore(from_snapshot);
       for (int i = 0; i < this->solver_->iter(); ++i) {
         this->solver_->net()->Forward();
@@ -202,9 +205,10 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
           gpus.push_back(i);
       }
       Caffe::set_solver_count(gpus.size());
-      this->sync_.reset(new P2PSync<Dtype>(
-          this->solver_, NULL, this->solver_->param()));
-      this->sync_->Run(gpus);
+#ifdef USE_NCCL
+      this->nccl_.reset(new NCCL<Dtype>(this->solver_));
+      this->nccl_->Run(gpus, from_snapshot);
+#endif
       Caffe::set_solver_count(1);
     }
     if (snapshot) {
@@ -457,12 +461,28 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
     const int kIterSize = 1;
     // Test over all numbers of devices.
     int available_devices = 1;
-#ifndef CPU_ONLY
+#ifdef USE_NCCL
     if (Caffe::mode() == Caffe::GPU) {
       CUDA_CHECK(cudaGetDeviceCount(&available_devices));
     }
 #endif
-    for (int devices = 1; devices <= available_devices; ++devices) {
+    // Takes a while to test all sizes for each test so sparse
+    vector<int> sizes;
+    sizes.push_back(1);
+    if (available_devices >= 2) {
+      sizes.push_back(2);
+    }
+    if (available_devices >= 3) {
+      sizes.push_back(3);
+    }
+    if (available_devices >= 8) {
+      sizes.push_back(8);
+    }
+    if (available_devices >= 16) {
+      sizes.push_back(16);
+    }
+    for (int i = 0; i < sizes.size(); ++i) {
+      int devices = sizes[i];
       // Configure batch size for single / multi device equivalence.
       // Constant data is needed for multi device as for accumulation.
       num_ = kNum * devices;
index 8884ce9..68e1028 100644 (file)
@@ -133,4 +133,34 @@ TYPED_TEST(HDF5DataLayerTest, TestRead) {
   }
 }
 
+TYPED_TEST(HDF5DataLayerTest, TestSkip) {
+  typedef typename TypeParam::Dtype Dtype;
+  LayerParameter param;
+  param.add_top("data");
+  param.add_top("label");
+
+  HDF5DataParameter* hdf5_data_param = param.mutable_hdf5_data_param();
+  int batch_size = 5;
+  hdf5_data_param->set_batch_size(batch_size);
+  hdf5_data_param->set_source(*(this->filename));
+
+  Caffe::set_solver_count(8);
+  for (int dev = 0; dev < Caffe::solver_count(); ++dev) {
+    Caffe::set_solver_rank(dev);
+
+    HDF5DataLayer<Dtype> layer(param);
+    layer.SetUp(this->blob_bottom_vec_, this->blob_top_vec_);
+    int label = dev;
+    for (int iter = 0; iter < 1; ++iter) {
+      layer.Forward(this->blob_bottom_vec_, this->blob_top_vec_);
+      for (int i = 0; i < batch_size; ++i) {
+        EXPECT_EQ(1 + label, this->blob_top_label_->cpu_data()[i]);
+        label = (label + Caffe::solver_count()) % (batch_size * 2);
+      }
+    }
+  }
+  Caffe::set_solver_count(1);
+  Caffe::set_solver_rank(0);
+}
+
 }  // namespace caffe
index 058668f..f69d210 100644 (file)
@@ -1,7 +1,6 @@
 #include <boost/thread.hpp>
 #include <string>
 
-#include "caffe/data_reader.hpp"
 #include "caffe/layers/base_data_layer.hpp"
 #include "caffe/parallel.hpp"
 #include "caffe/util/blocking_queue.hpp"
@@ -88,9 +87,5 @@ size_t BlockingQueue<T>::size() const {
 
 template class BlockingQueue<Batch<float>*>;
 template class BlockingQueue<Batch<double>*>;
-template class BlockingQueue<Datum*>;
-template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
-template class BlockingQueue<P2PSync<float>*>;
-template class BlockingQueue<P2PSync<double>*>;
 
 }  // namespace caffe
index fb1d495..491a9bd 100644 (file)
@@ -32,7 +32,7 @@ void LMDB::Open(const string& source, Mode mode) {
     MDB_CHECK(rc);
   }
 #endif
-  LOG(INFO) << "Opened lmdb " << source;
+  LOG_IF(INFO, Caffe::root_solver()) << "Opened lmdb " << source;
 }
 
 LMDBCursor* LMDB::NewCursor() {
index 4c58753..6d00102 100644 (file)
@@ -91,6 +91,26 @@ void caffe_gpu_scal<double>(const int N, const double alpha, double *X) {
 }
 
 template <>
+void caffe_gpu_scal<float>(const int N, const float alpha, float* X,
+                           cudaStream_t str) {
+  cudaStream_t initial_stream;
+  CUBLAS_CHECK(cublasGetStream(Caffe::cublas_handle(), &initial_stream));
+  CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), str));
+  CUBLAS_CHECK(cublasSscal(Caffe::cublas_handle(), N, &alpha, X, 1));
+  CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), initial_stream));
+}
+
+template <>
+void caffe_gpu_scal<double>(const int N, const double alpha, double* X,
+                            cudaStream_t str) {
+  cudaStream_t initial_stream;
+  CUBLAS_CHECK(cublasGetStream(Caffe::cublas_handle(), &initial_stream));
+  CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), str));
+  CUBLAS_CHECK(cublasDscal(Caffe::cublas_handle(), N, &alpha, X, 1));
+  CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), initial_stream));
+}
+
+template <>
 void caffe_gpu_axpby<float>(const int N, const float alpha, const float* X,
     const float beta, float* Y) {
   caffe_gpu_scal<float>(N, beta, Y);
index 9bf4214..3587d8a 100644 (file)
@@ -195,6 +195,7 @@ int train() {
   // If the gpus flag is not provided, allow the mode and device to be set
   // in the solver prototxt.
   if (FLAGS_gpu.size() == 0
+      && solver_param.has_solver_mode()
       && solver_param.solver_mode() == caffe::SolverParameter_SolverMode_GPU) {
       if (solver_param.has_device_id()) {
           FLAGS_gpu = "" +
@@ -244,11 +245,15 @@ int train() {
     CopyLayers(solver.get(), FLAGS_weights);
   }
 
+  LOG(INFO) << "Starting Optimization";
   if (gpus.size() > 1) {
-    caffe::P2PSync<float> sync(solver, NULL, solver->param());
-    sync.Run(gpus);
+#ifdef USE_NCCL
+    caffe::NCCL<float> nccl(solver);
+    nccl.Run(gpus, FLAGS_snapshot.size() > 0 ? FLAGS_snapshot.c_str() : NULL);
+#else
+    LOG(FATAL) << "Multi-GPU execution not available - rebuild with USE_NCCL";
+#endif
   } else {
-    LOG(INFO) << "Starting Optimization";
     solver->Solve();
   }
   LOG(INFO) << "Optimization Done.";