return status;
}
-bool DataBuffer::getDataFromBuffer(BufferType type, vec_4d &outVec,
- vec_4d &outLabel) {
- unsigned int J, i, j, k, L, l;
- unsigned int width = input_dim.width();
- unsigned int height = input_dim.height();
- unsigned int channel = input_dim.channel();
+bool DataBuffer::getDataFromBuffer(BufferType type, float *out, float *label) {
- switch (type) {
- case BUF_TRAIN: {
- std::vector<int> list;
- while (true) {
- std::unique_lock<std::mutex> ultrain(readyTrainData);
- cv_train.wait(ultrain, [this]() -> bool { return trainReadyFlag; });
- if (trainReadyFlag == DATA_ERROR || trainReadyFlag == DATA_END) {
- if (train_data.size() < batch_size)
- return false;
- else
- break;
- }
- if (trainReadyFlag == DATA_READY && train_data.size() >= batch_size) {
- break;
- }
- }
-
- for (k = 0; k < batch_size; ++k) {
- std::vector<std::vector<std::vector<float>>> v_channel;
- for (l = 0; l < channel; ++l) {
- L = l * width * height;
- std::vector<std::vector<float>> v_height;
- for (j = 0; j < height; ++j) {
- J = L + j * width;
- std::vector<float> v_width;
- for (i = 0; i < width; ++i) {
- v_width.push_back(train_data[k][J + i]);
- }
- v_height.push_back(v_width);
- }
- v_channel.push_back(v_height);
- }
- outVec.push_back(v_channel);
- outLabel.push_back({{train_data_label[k]}});
- }
+ using QueueType = std::vector<std::vector<float>>;
- data_lock.lock();
-
- train_data.erase(train_data.begin(), train_data.begin() + batch_size);
- train_data_label.erase(train_data_label.begin(),
- train_data_label.begin() + batch_size);
- cur_train_bufsize -= batch_size;
- } break;
- case BUF_VAL: {
- std::vector<int> list;
+ auto wait_for_data_fill = [](std::mutex &ready_mutex,
+ std::condition_variable &cv, DataStatus &flag,
+ const unsigned int batch_size,
+ QueueType &queue) {
while (true) {
- std::unique_lock<std::mutex> ulval(readyValData);
- cv_val.wait(ulval, [this]() -> bool { return valReadyFlag; });
- if (valReadyFlag == DATA_ERROR || valReadyFlag == DATA_END) {
- if (val_data.size() < batch_size)
- return false;
- else
- break;
- }
- if (valReadyFlag == DATA_READY && val_data.size() >= batch_size) {
- break;
- }
- }
+ std::unique_lock<std::mutex> ul(ready_mutex);
+ cv.wait(ul, [&]() -> bool { return flag; });
+ if (flag == DATA_ERROR || flag == DATA_END)
+ return queue.size() < batch_size ? false : true;
- for (k = 0; k < batch_size; ++k) {
- std::vector<std::vector<std::vector<float>>> v_channel;
- for (l = 0; l < channel; ++l) {
- L = l * width * height;
- std::vector<std::vector<float>> v_height;
- for (j = 0; j < height; ++j) {
- J = L + j * width;
- std::vector<float> v_width;
- for (i = 0; i < width; ++i) {
- v_width.push_back(val_data[k][J + i]);
- }
- v_height.push_back(v_width);
- }
- v_channel.push_back(v_height);
- }
- outVec.push_back(v_channel);
- outLabel.push_back({{val_data_label[k]}});
+ if (flag == DATA_READY && queue.size() >= batch_size)
+ return true;
}
- data_lock.lock();
-
- val_data.erase(val_data.begin(), val_data.begin() + batch_size);
- val_data_label.erase(val_data_label.begin(),
- val_data_label.begin() + batch_size);
- cur_val_bufsize -= batch_size;
-
- } break;
- case BUF_TEST: {
- std::vector<int> list;
- while (true) {
- std::unique_lock<std::mutex> ultest(readyTestData);
- cv_test.wait(ultest, [this]() -> bool { return testReadyFlag; });
-
- if (testReadyFlag == DATA_ERROR || testReadyFlag == DATA_END) {
- if (test_data.size() < batch_size)
- return false;
- else
- break;
- }
- if (testReadyFlag == DATA_READY && test_data.size() >= batch_size) {
- break;
+ throw std::logic_error("[getDataFromBuffer] control should not reach here");
+ };
+
+ auto fill_bundled_data_from_queue =
+ [](std::mutex &q_lock, QueueType &q, const unsigned int batch_size,
+ const unsigned int feature_size, float *buf) {
+ for (unsigned int b = 0; b < batch_size; ++b)
+ std::copy(q[b].begin(), q[b].begin() + feature_size,
+ buf + b * feature_size);
+
+ q_lock.lock();
+ q.erase(q.begin(), q.begin() + batch_size);
+ q_lock.unlock();
+ };
+
+ /// facade that wait for the databuffer to be filled and pass it to outparam
+ /// note that batch_size is passed as an argument because it can vary by
+ /// BUF_TYPE later...
+ auto fill_out_params =
+ [&](std::mutex &ready_mutex, std::condition_variable &cv, DataStatus &flag,
+ QueueType &data_q, QueueType &label_q, const unsigned int batch_size,
+ unsigned int &cur_bufsize) {
+ if (!wait_for_data_fill(ready_mutex, cv, flag, batch_size, data_q)) {
+ return false;
}
- }
- for (k = 0; k < batch_size; ++k) {
- std::vector<std::vector<std::vector<float>>> v_channel;
- for (l = 0; l < channel; ++l) {
- L = l * width * height;
- std::vector<std::vector<float>> v_height;
- for (j = 0; j < height; ++j) {
- J = L + j * width;
- std::vector<float> v_width;
- for (i = 0; i < width; ++i) {
- v_width.push_back(test_data[k][J + i]);
- }
- v_height.push_back(v_width);
- }
- v_channel.push_back(v_height);
- }
- outVec.push_back(v_channel);
- outLabel.push_back({{test_data_label[k]}});
- }
+ fill_bundled_data_from_queue(data_lock, data_q, batch_size,
+ this->input_dim.getFeatureLen(), out);
+ fill_bundled_data_from_queue(data_lock, label_q, batch_size,
+ this->class_num, label);
- data_lock.lock();
- test_data.erase(test_data.begin(), test_data.begin() + batch_size);
- test_data_label.erase(test_data_label.begin(),
- test_data_label.begin() + batch_size);
- cur_test_bufsize -= batch_size;
- } break;
+ cur_bufsize -= batch_size;
+ return true;
+ };
+
+ switch (type) {
+ case BUF_TRAIN:
+ if (!fill_out_params(readyTrainData, cv_train, trainReadyFlag, train_data,
+ train_data_label, batch_size, cur_train_bufsize))
+ return false;
+ break;
+ case BUF_VAL:
+ if (!fill_out_params(readyValData, cv_val, valReadyFlag, val_data,
+ val_data_label, batch_size, cur_val_bufsize))
+ return false;
+ break;
+ case BUF_TEST:
+ if (!fill_out_params(readyTestData, cv_test, testReadyFlag, test_data,
+ test_data_label, batch_size, cur_test_bufsize))
+ return false;
+ break;
default:
ml_loge("Error: Not Supported Data Type");
return false;
break;
}
- data_lock.unlock();
return true;
}