Add BlockingQueue for inter-thread communication
authorCyprien Noel <cyprien.noel@gmail.com>
Tue, 28 Apr 2015 21:28:04 +0000 (14:28 -0700)
committerEvan Shelhamer <shelhamer@imaginarynumber.net>
Sun, 9 Aug 2015 22:13:10 +0000 (15:13 -0700)
include/caffe/util/blocking_queue.hpp [new file with mode: 0644]
src/caffe/util/blocking_queue.cpp [new file with mode: 0644]

diff --git a/include/caffe/util/blocking_queue.hpp b/include/caffe/util/blocking_queue.hpp
new file mode 100644 (file)
index 0000000..955e12c
--- /dev/null
@@ -0,0 +1,47 @@
+#ifndef CAFFE_UTIL_BLOCKING_QUEUE_HPP_
+#define CAFFE_UTIL_BLOCKING_QUEUE_HPP_
+
+#include <queue>
+#include <string>
+
+#include "caffe/common.hpp"
+
+namespace caffe {
+
+template<typename T>
+class BlockingQueue {
+ public:
+  explicit BlockingQueue();
+
+  void push(const T& t);
+
+  bool try_pop(T* t);
+
+  // This logs a message if the threads needs to be blocked
+  // useful for detecting e.g. when data feeding is too slow
+  T pop(const string& log_on_wait = "");
+
+  bool try_peek(T* t);
+
+  // Return element without removing it
+  T peek();
+
+  size_t size() const;
+
+ protected:
+  /**
+   Move synchronization fields out instead of including boost/thread.hpp
+   to avoid a boost/NVCC issues (#1009, #1010) on OSX. Also fails on
+   Linux CUDA 7.0.18.
+   */
+  class sync;
+
+  std::queue<T> queue_;
+  shared_ptr<sync> sync_;
+
+DISABLE_COPY_AND_ASSIGN(BlockingQueue);
+};
+
+}  // namespace caffe
+
+#endif
diff --git a/src/caffe/util/blocking_queue.cpp b/src/caffe/util/blocking_queue.cpp
new file mode 100644 (file)
index 0000000..73c9564
--- /dev/null
@@ -0,0 +1,86 @@
+#include <boost/thread.hpp>
+#include <string>
+
+#include "caffe/util/blocking_queue.hpp"
+
+namespace caffe {
+
+template<typename T>
+class BlockingQueue<T>::sync {
+ public:
+  mutable boost::mutex mutex_;
+  boost::condition_variable condition_;
+};
+
+template<typename T>
+BlockingQueue<T>::BlockingQueue()
+    : sync_(new sync()) {
+}
+
+template<typename T>
+void BlockingQueue<T>::push(const T& t) {
+  boost::mutex::scoped_lock lock(sync_->mutex_);
+  queue_.push(t);
+  lock.unlock();
+  sync_->condition_.notify_one();
+}
+
+template<typename T>
+bool BlockingQueue<T>::try_pop(T* t) {
+  boost::mutex::scoped_lock lock(sync_->mutex_);
+
+  if (queue_.empty()) {
+    return false;
+  }
+
+  *t = queue_.front();
+  queue_.pop();
+  return true;
+}
+
+template<typename T>
+T BlockingQueue<T>::pop(const string& log_on_wait) {
+  boost::mutex::scoped_lock lock(sync_->mutex_);
+
+  while (queue_.empty()) {
+    if (!log_on_wait.empty()) {
+      LOG_EVERY_N(INFO, 1000)<< log_on_wait;
+    }
+    sync_->condition_.wait(lock);
+  }
+
+  T t = queue_.front();
+  queue_.pop();
+  return t;
+}
+
+template<typename T>
+bool BlockingQueue<T>::try_peek(T* t) {
+  boost::mutex::scoped_lock lock(sync_->mutex_);
+
+  if (queue_.empty()) {
+    return false;
+  }
+
+  *t = queue_.front();
+  return true;
+}
+
+template<typename T>
+T BlockingQueue<T>::peek() {
+  boost::mutex::scoped_lock lock(sync_->mutex_);
+
+  while (queue_.empty()) {
+    sync_->condition_.wait(lock);
+  }
+
+  return queue_.front();
+}
+
+template<typename T>
+size_t BlockingQueue<T>::size() const {
+  boost::mutex::scoped_lock lock(sync_->mutex_);
+  return queue_.size();
+}
+
+}  // namespace caffe