#include "leveldb/db.h"
#include "lmdb.h"
-#include "pthread.h"
#include "hdf5.h"
#include "boost/scoped_ptr.hpp"
#include "caffe/filler.hpp"
#include "caffe/layer.hpp"
#include "caffe/proto/caffe.pb.h"
+#include "caffe/internal_thread.hpp"
namespace caffe {
// TODO: DataLayer, ImageDataLayer, and WindowDataLayer all have the
// same basic structure and a lot of duplicated code.
-// This function is used to create a pthread that prefetches the data.
template <typename Dtype>
-void* DataLayerPrefetch(void* layer_pointer);
-
-template <typename Dtype>
-class DataLayer : public Layer<Dtype> {
- // The function used to perform prefetching.
- friend void* DataLayerPrefetch<Dtype>(void* layer_pointer);
-
+class DataLayer : public Layer<Dtype>, public InternalThread {
public:
explicit DataLayer(const LayerParameter& param)
: Layer<Dtype>(param) {}
virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
virtual unsigned int PrefetchRand();
+ // The thread's function
+ virtual void InternalThreadEntry();
shared_ptr<Caffe::RNG> prefetch_rng_;
int datum_height_;
int datum_width_;
int datum_size_;
- pthread_t thread_;
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
Blob<Dtype> data_mean_;
Blob<Dtype> label_blob_;
};
-// This function is used to create a pthread that prefetches the data.
-template <typename Dtype>
-void* ImageDataLayerPrefetch(void* layer_pointer);
-
template <typename Dtype>
-class ImageDataLayer : public Layer<Dtype> {
- // The function used to perform prefetching.
- friend void* ImageDataLayerPrefetch<Dtype>(void* layer_pointer);
-
+class ImageDataLayer : public Layer<Dtype>, public InternalThread {
public:
explicit ImageDataLayer(const LayerParameter& param)
: Layer<Dtype>(param) {}
virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
virtual unsigned int PrefetchRand();
+ virtual void InternalThreadEntry();
shared_ptr<Caffe::RNG> prefetch_rng_;
vector<std::pair<std::string, int> > lines_;
int datum_height_;
int datum_width_;
int datum_size_;
- pthread_t thread_;
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
Blob<Dtype> data_mean_;
int pos_;
};
-// This function is used to create a pthread that prefetches the window data.
-template <typename Dtype>
-void* WindowDataLayerPrefetch(void* layer_pointer);
-
template <typename Dtype>
-class WindowDataLayer : public Layer<Dtype> {
- // The function used to perform prefetching.
- friend void* WindowDataLayerPrefetch<Dtype>(void* layer_pointer);
-
+class WindowDataLayer : public Layer<Dtype>, public InternalThread {
public:
explicit WindowDataLayer(const LayerParameter& param)
: Layer<Dtype>(param) {}
virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
virtual unsigned int PrefetchRand();
+ virtual void InternalThreadEntry();
shared_ptr<Caffe::RNG> prefetch_rng_;
- pthread_t thread_;
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
Blob<Dtype> data_mean_;
--- /dev/null
+// Copyright 2014 BVLC and contributors.
+
+#ifndef CAFFE_INTERNAL_THREAD_HPP_
+#define CAFFE_INTERNAL_THREAD_HPP_
+
+#include <pthread.h>
+
+namespace caffe {
+
+/**
+ * Virutal class encapsulate pthread for use in base class
+ * The child class will acquire the ability to run a single pthread,
+ * by reimplementing the virutal function InternalThreadEntry.
+ */
+class InternalThread {
+ public:
+ InternalThread() {}
+ virtual ~InternalThread() {}
+
+ /** Returns true if the thread was successfully started **/
+ bool StartInternalThread() {
+ return pthread_create(&_thread, NULL, InternalThreadEntryFunc, this);
+ }
+
+ /** Will not return until the internal thread has exited. */
+ bool WaitForInternalThreadToExit() {
+ return pthread_join(_thread, NULL);
+ }
+
+ protected:
+ /* Implement this method in your subclass
+ with the code you want your thread to run. */
+ virtual void InternalThreadEntry() = 0;
+
+ private:
+ static void * InternalThreadEntryFunc(void * This) {
+ reinterpret_cast<InternalThread *>(This)->InternalThreadEntry();
+ return NULL;
+ }
+
+ pthread_t _thread;
+};
+
+} // namespace caffe
+
+#endif
#include <stdint.h>
#include <leveldb/db.h>
-#include <pthread.h>
#include <string>
#include <vector>
namespace caffe {
+// This function is used to create a pthread that prefetches the data.
template <typename Dtype>
-void* DataLayerPrefetch(void* layer_pointer) {
- CHECK(layer_pointer);
- DataLayer<Dtype>* layer = static_cast<DataLayer<Dtype>*>(layer_pointer);
- CHECK(layer);
+void DataLayer<Dtype>::InternalThreadEntry() {
Datum datum;
- CHECK(layer->prefetch_data_.count());
- Dtype* top_data = layer->prefetch_data_.mutable_cpu_data();
+ CHECK(prefetch_data_.count());
+ Dtype* top_data = prefetch_data_.mutable_cpu_data();
Dtype* top_label = NULL; // suppress warnings about uninitialized variables
- if (layer->output_labels_) {
- top_label = layer->prefetch_label_.mutable_cpu_data();
+ if (output_labels_) {
+ top_label = prefetch_label_.mutable_cpu_data();
}
- const Dtype scale = layer->layer_param_.data_param().scale();
- const int batch_size = layer->layer_param_.data_param().batch_size();
- const int crop_size = layer->layer_param_.data_param().crop_size();
- const bool mirror = layer->layer_param_.data_param().mirror();
+ const Dtype scale = this->layer_param_.data_param().scale();
+ const int batch_size = this->layer_param_.data_param().batch_size();
+ const int crop_size = this->layer_param_.data_param().crop_size();
+ const bool mirror = this->layer_param_.data_param().mirror();
if (mirror && crop_size == 0) {
LOG(FATAL) << "Current implementation requires mirror and crop_size to be "
<< "set at the same time.";
}
// datum scales
- const int channels = layer->datum_channels_;
- const int height = layer->datum_height_;
- const int width = layer->datum_width_;
- const int size = layer->datum_size_;
- const Dtype* mean = layer->data_mean_.cpu_data();
+ const int channels = datum_channels_;
+ const int height = datum_height_;
+ const int width = datum_width_;
+ const int size = datum_size_;
+ const Dtype* mean = data_mean_.cpu_data();
for (int item_id = 0; item_id < batch_size; ++item_id) {
// get a blob
- switch (layer->layer_param_.data_param().backend()) {
+ switch (this->layer_param_.data_param().backend()) {
case DataParameter_DB_LEVELDB:
- CHECK(layer->iter_);
- CHECK(layer->iter_->Valid());
- datum.ParseFromString(layer->iter_->value().ToString());
+ CHECK(iter_);
+ CHECK(iter_->Valid());
+ datum.ParseFromString(iter_->value().ToString());
break;
case DataParameter_DB_LMDB:
- CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
- &layer->mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
- datum.ParseFromArray(layer->mdb_value_.mv_data,
- layer->mdb_value_.mv_size);
+ CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+ &mdb_value_, MDB_GET_CURRENT), MDB_SUCCESS);
+ datum.ParseFromArray(mdb_value_.mv_data,
+ mdb_value_.mv_size);
break;
default:
LOG(FATAL) << "Unknown database backend";
CHECK(data.size()) << "Image cropping only support uint8 data";
int h_off, w_off;
// We only do random crop when we do training.
- if (layer->phase_ == Caffe::TRAIN) {
- h_off = layer->PrefetchRand() % (height - crop_size);
- w_off = layer->PrefetchRand() % (width - crop_size);
+ if (phase_ == Caffe::TRAIN) {
+ h_off = PrefetchRand() % (height - crop_size);
+ w_off = PrefetchRand() % (width - crop_size);
} else {
h_off = (height - crop_size) / 2;
w_off = (width - crop_size) / 2;
}
- if (mirror && layer->PrefetchRand() % 2) {
+ if (mirror && PrefetchRand() % 2) {
// Copy mirrored version
for (int c = 0; c < channels; ++c) {
for (int h = 0; h < crop_size; ++h) {
}
}
- if (layer->output_labels_) {
+ if (output_labels_) {
top_label[item_id] = datum.label();
}
// go to the next iter
- switch (layer->layer_param_.data_param().backend()) {
+ switch (this->layer_param_.data_param().backend()) {
case DataParameter_DB_LEVELDB:
- layer->iter_->Next();
- if (!layer->iter_->Valid()) {
+ iter_->Next();
+ if (!iter_->Valid()) {
// We have reached the end. Restart from the first.
DLOG(INFO) << "Restarting data prefetching from start.";
- layer->iter_->SeekToFirst();
+ iter_->SeekToFirst();
}
break;
case DataParameter_DB_LMDB:
- if (mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
- &layer->mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
+ if (mdb_cursor_get(mdb_cursor_, &mdb_key_,
+ &mdb_value_, MDB_NEXT) != MDB_SUCCESS) {
// We have reached the end. Restart from the first.
DLOG(INFO) << "Restarting data prefetching from start.";
- CHECK_EQ(mdb_cursor_get(layer->mdb_cursor_, &layer->mdb_key_,
- &layer->mdb_value_, MDB_FIRST), MDB_SUCCESS);
+ CHECK_EQ(mdb_cursor_get(mdb_cursor_, &mdb_key_,
+ &mdb_value_, MDB_FIRST), MDB_SUCCESS);
}
break;
default:
LOG(FATAL) << "Unknown database backend";
}
}
-
- return static_cast<void*>(NULL);
}
template <typename Dtype>
} else {
prefetch_rng_.reset();
}
- // Create the thread.
- CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>,
- static_cast<void*>(this))) << "Pthread execution failed.";
+ CHECK(!StartInternalThread()) << "Pthread execution failed";
}
template <typename Dtype>
void DataLayer<Dtype>::JoinPrefetchThread() {
- CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+ CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed";
}
template <typename Dtype>
#include <stdint.h>
#include <leveldb/db.h>
-#include <pthread.h>
#include <string>
#include <vector>
namespace caffe {
+// This function is used to create a pthread that prefetches the data.
template <typename Dtype>
-void* ImageDataLayerPrefetch(void* layer_pointer) {
- CHECK(layer_pointer);
- ImageDataLayer<Dtype>* layer =
- reinterpret_cast<ImageDataLayer<Dtype>*>(layer_pointer);
- CHECK(layer);
+void ImageDataLayer<Dtype>::InternalThreadEntry() {
Datum datum;
- CHECK(layer->prefetch_data_.count());
- Dtype* top_data = layer->prefetch_data_.mutable_cpu_data();
- Dtype* top_label = layer->prefetch_label_.mutable_cpu_data();
- ImageDataParameter image_data_param = layer->layer_param_.image_data_param();
+ CHECK(prefetch_data_.count());
+ Dtype* top_data = prefetch_data_.mutable_cpu_data();
+ Dtype* top_label = prefetch_label_.mutable_cpu_data();
+ ImageDataParameter image_data_param = this->layer_param_.image_data_param();
const Dtype scale = image_data_param.scale();
const int batch_size = image_data_param.batch_size();
const int crop_size = image_data_param.crop_size();
<< "set at the same time.";
}
// datum scales
- const int channels = layer->datum_channels_;
- const int height = layer->datum_height_;
- const int width = layer->datum_width_;
- const int size = layer->datum_size_;
- const int lines_size = layer->lines_.size();
- const Dtype* mean = layer->data_mean_.cpu_data();
+ const int channels = datum_channels_;
+ const int height = datum_height_;
+ const int width = datum_width_;
+ const int size = datum_size_;
+ const int lines_size = lines_.size();
+ const Dtype* mean = data_mean_.cpu_data();
for (int item_id = 0; item_id < batch_size; ++item_id) {
// get a blob
- CHECK_GT(lines_size, layer->lines_id_);
- if (!ReadImageToDatum(layer->lines_[layer->lines_id_].first,
- layer->lines_[layer->lines_id_].second,
+ CHECK_GT(lines_size, lines_id_);
+ if (!ReadImageToDatum(lines_[lines_id_].first,
+ lines_[lines_id_].second,
new_height, new_width, &datum)) {
continue;
}
CHECK(data.size()) << "Image cropping only support uint8 data";
int h_off, w_off;
// We only do random crop when we do training.
- if (layer->phase_ == Caffe::TRAIN) {
- h_off = layer->PrefetchRand() % (height - crop_size);
- w_off = layer->PrefetchRand() % (width - crop_size);
+ if (phase_ == Caffe::TRAIN) {
+ h_off = PrefetchRand() % (height - crop_size);
+ w_off = PrefetchRand() % (width - crop_size);
} else {
h_off = (height - crop_size) / 2;
w_off = (width - crop_size) / 2;
}
- if (mirror && layer->PrefetchRand() % 2) {
+ if (mirror && PrefetchRand() % 2) {
// Copy mirrored version
for (int c = 0; c < channels; ++c) {
for (int h = 0; h < crop_size; ++h) {
top_label[item_id] = datum.label();
// go to the next iter
- layer->lines_id_++;
- if (layer->lines_id_ >= lines_size) {
+ lines_id_++;
+ if (lines_id_ >= lines_size) {
// We have reached the end. Restart from the first.
DLOG(INFO) << "Restarting data prefetching from start.";
- layer->lines_id_ = 0;
- if (layer->layer_param_.image_data_param().shuffle()) {
- layer->ShuffleImages();
+ lines_id_ = 0;
+ if (this->layer_param_.image_data_param().shuffle()) {
+ ShuffleImages();
}
}
}
-
- return reinterpret_cast<void*>(NULL);
}
template <typename Dtype>
prefetch_rng_.reset();
}
// Create the thread.
- CHECK(!pthread_create(&thread_, NULL, ImageDataLayerPrefetch<Dtype>,
- static_cast<void*>(this))) << "Pthread execution failed.";
+ CHECK(!StartInternalThread()) << "Pthread execution failed";
}
template <typename Dtype>
}
}
+
template <typename Dtype>
void ImageDataLayer<Dtype>::JoinPrefetchThread() {
- CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+ CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed";
}
template <typename Dtype>
// Based on data_layer.cpp by Yangqing Jia.
#include <stdint.h>
-#include <pthread.h>
#include <algorithm>
#include <string>
namespace caffe {
+// Thread fetching the data
template <typename Dtype>
-void* WindowDataLayerPrefetch(void* layer_pointer) {
- WindowDataLayer<Dtype>* layer =
- reinterpret_cast<WindowDataLayer<Dtype>*>(layer_pointer);
-
+void WindowDataLayer<Dtype>::InternalThreadEntry() {
// At each iteration, sample N windows where N*p are foreground (object)
// windows and N*(1-p) are background (non-object) windows
- Dtype* top_data = layer->prefetch_data_.mutable_cpu_data();
- Dtype* top_label = layer->prefetch_label_.mutable_cpu_data();
- const Dtype scale = layer->layer_param_.window_data_param().scale();
- const int batch_size = layer->layer_param_.window_data_param().batch_size();
- const int crop_size = layer->layer_param_.window_data_param().crop_size();
- const int context_pad = layer->layer_param_.window_data_param().context_pad();
- const bool mirror = layer->layer_param_.window_data_param().mirror();
+ Dtype* top_data = prefetch_data_.mutable_cpu_data();
+ Dtype* top_label = prefetch_label_.mutable_cpu_data();
+ const Dtype scale = this->layer_param_.window_data_param().scale();
+ const int batch_size = this->layer_param_.window_data_param().batch_size();
+ const int crop_size = this->layer_param_.window_data_param().crop_size();
+ const int context_pad = this->layer_param_.window_data_param().context_pad();
+ const bool mirror = this->layer_param_.window_data_param().mirror();
const float fg_fraction =
- layer->layer_param_.window_data_param().fg_fraction();
- const Dtype* mean = layer->data_mean_.cpu_data();
- const int mean_off = (layer->data_mean_.width() - crop_size) / 2;
- const int mean_width = layer->data_mean_.width();
- const int mean_height = layer->data_mean_.height();
+ this->layer_param_.window_data_param().fg_fraction();
+ const Dtype* mean = data_mean_.cpu_data();
+ const int mean_off = (data_mean_.width() - crop_size) / 2;
+ const int mean_width = data_mean_.width();
+ const int mean_height = data_mean_.height();
cv::Size cv_crop_size(crop_size, crop_size);
- const string& crop_mode = layer->layer_param_.window_data_param().crop_mode();
+ const string& crop_mode = this->layer_param_.window_data_param().crop_mode();
bool use_square = (crop_mode == "square") ? true : false;
// zero out batch
- caffe_set(layer->prefetch_data_.count(), Dtype(0), top_data);
+ caffe_set(prefetch_data_.count(), Dtype(0), top_data);
const int num_fg = static_cast<int>(static_cast<float>(batch_size)
* fg_fraction);
for (int is_fg = 0; is_fg < 2; ++is_fg) {
for (int dummy = 0; dummy < num_samples[is_fg]; ++dummy) {
// sample a window
- const unsigned int rand_index = layer->PrefetchRand();
+ const unsigned int rand_index = PrefetchRand();
vector<float> window = (is_fg) ?
- layer->fg_windows_[rand_index % layer->fg_windows_.size()] :
- layer->bg_windows_[rand_index % layer->bg_windows_.size()];
+ fg_windows_[rand_index % fg_windows_.size()] :
+ bg_windows_[rand_index % bg_windows_.size()];
bool do_mirror = false;
- if (mirror && layer->PrefetchRand() % 2) {
+ if (mirror && PrefetchRand() % 2) {
do_mirror = true;
}
// load the image containing the window
pair<std::string, vector<int> > image =
- layer->image_database_[window[WindowDataLayer<Dtype>::IMAGE_INDEX]];
+ image_database_[window[WindowDataLayer<Dtype>::IMAGE_INDEX]];
cv::Mat cv_img = cv::imread(image.first, CV_LOAD_IMAGE_COLOR);
if (!cv_img.data) {
LOG(ERROR) << "Could not open or find file " << image.first;
- return reinterpret_cast<void*>(NULL);
+ return;
}
const int channels = cv_img.channels();
// useful debugging code for dumping transformed windows to disk
string file_id;
std::stringstream ss;
- ss << layer->PrefetchRand();
+ ss << PrefetchRand();
ss >> file_id;
std::ofstream inf((string("dump/") + file_id +
string("_info.txt")).c_str(), std::ofstream::out);
item_id++;
}
}
-
- return reinterpret_cast<void*>(NULL);
}
template <typename Dtype>
prefetch_rng_.reset();
}
// Create the thread.
- CHECK(!pthread_create(&thread_, NULL, WindowDataLayerPrefetch<Dtype>,
- static_cast<void*>(this))) << "Pthread execution failed.";
+ CHECK(!StartInternalThread()) << "Pthread execution failed.";
}
template <typename Dtype>
void WindowDataLayer<Dtype>::JoinPrefetchThread() {
- CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+ CHECK(!WaitForInternalThreadToExit()) << "Pthread joining failed.";
}
template <typename Dtype>
#include "caffe/test/test_caffe_main.hpp"
-using std::isnan;
-
namespace caffe {
template <typename TypeParam>