From d94ca3f4b15d1e02bdf520733b04999f1134d078 Mon Sep 17 00:00:00 2001 From: Cyprien Noel Date: Tue, 28 Apr 2015 14:28:04 -0700 Subject: [PATCH] Add BlockingQueue for inter-thread communication --- include/caffe/util/blocking_queue.hpp | 47 +++++++++++++++++++ src/caffe/util/blocking_queue.cpp | 86 +++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 include/caffe/util/blocking_queue.hpp create mode 100644 src/caffe/util/blocking_queue.cpp diff --git a/include/caffe/util/blocking_queue.hpp b/include/caffe/util/blocking_queue.hpp new file mode 100644 index 0000000..955e12c --- /dev/null +++ b/include/caffe/util/blocking_queue.hpp @@ -0,0 +1,47 @@ +#ifndef CAFFE_UTIL_BLOCKING_QUEUE_HPP_ +#define CAFFE_UTIL_BLOCKING_QUEUE_HPP_ + +#include +#include + +#include "caffe/common.hpp" + +namespace caffe { + +template +class BlockingQueue { + public: + explicit BlockingQueue(); + + void push(const T& t); + + bool try_pop(T* t); + + // This logs a message if the threads needs to be blocked + // useful for detecting e.g. when data feeding is too slow + T pop(const string& log_on_wait = ""); + + bool try_peek(T* t); + + // Return element without removing it + T peek(); + + size_t size() const; + + protected: + /** + Move synchronization fields out instead of including boost/thread.hpp + to avoid a boost/NVCC issues (#1009, #1010) on OSX. Also fails on + Linux CUDA 7.0.18. + */ + class sync; + + std::queue queue_; + shared_ptr sync_; + +DISABLE_COPY_AND_ASSIGN(BlockingQueue); +}; + +} // namespace caffe + +#endif diff --git a/src/caffe/util/blocking_queue.cpp b/src/caffe/util/blocking_queue.cpp new file mode 100644 index 0000000..73c9564 --- /dev/null +++ b/src/caffe/util/blocking_queue.cpp @@ -0,0 +1,86 @@ +#include +#include + +#include "caffe/util/blocking_queue.hpp" + +namespace caffe { + +template +class BlockingQueue::sync { + public: + mutable boost::mutex mutex_; + boost::condition_variable condition_; +}; + +template +BlockingQueue::BlockingQueue() + : sync_(new sync()) { +} + +template +void BlockingQueue::push(const T& t) { + boost::mutex::scoped_lock lock(sync_->mutex_); + queue_.push(t); + lock.unlock(); + sync_->condition_.notify_one(); +} + +template +bool BlockingQueue::try_pop(T* t) { + boost::mutex::scoped_lock lock(sync_->mutex_); + + if (queue_.empty()) { + return false; + } + + *t = queue_.front(); + queue_.pop(); + return true; +} + +template +T BlockingQueue::pop(const string& log_on_wait) { + boost::mutex::scoped_lock lock(sync_->mutex_); + + while (queue_.empty()) { + if (!log_on_wait.empty()) { + LOG_EVERY_N(INFO, 1000)<< log_on_wait; + } + sync_->condition_.wait(lock); + } + + T t = queue_.front(); + queue_.pop(); + return t; +} + +template +bool BlockingQueue::try_peek(T* t) { + boost::mutex::scoped_lock lock(sync_->mutex_); + + if (queue_.empty()) { + return false; + } + + *t = queue_.front(); + return true; +} + +template +T BlockingQueue::peek() { + boost::mutex::scoped_lock lock(sync_->mutex_); + + while (queue_.empty()) { + sync_->condition_.wait(lock); + } + + return queue_.front(); +} + +template +size_t BlockingQueue::size() const { + boost::mutex::scoped_lock lock(sync_->mutex_); + return queue_.size(); +} + +} // namespace caffe -- 2.7.4