NNTRAINER_SRCS := $(NNTRAINER_ROOT)/nntrainer/models/neuralnet.cpp \
$(NNTRAINER_ROOT)/nntrainer/models/model_loader.cpp \
$(NNTRAINER_ROOT)/nntrainer/models/dynamic_training_optimization.cpp \
- $(NNTRAINER_ROOT)/nntrainer/dataset/batch_queue.cpp \
+ $(NNTRAINER_ROOT)/nntrainer/dataset/iteration_queue.cpp \
$(NNTRAINER_ROOT)/nntrainer/dataset/databuffer.cpp \
$(NNTRAINER_ROOT)/nntrainer/dataset/data_iteration.cpp \
$(NNTRAINER_ROOT)/nntrainer/dataset/databuffer_factory.cpp \
class DataProducer {
public:
/**
- * @brief Iteration represent a single batch which will be in a queue
- * @todo move this to data_buffer
- * @return std::get<0>(Iteration) denotes whether this is last iteration or
- * not, if true, std::get<1>(Iteration), std::get<2>(Iteration) will be
- * ignored
- * @return std::get<1>(Iteration) denotes inputs
- * @return std::get<2>(Iteration) denotes labels
- *
- */
- using Iteration = std::tuple<bool, std::vector<Tensor>, std::vector<Tensor>>;
-
- /**
- * @brief create an iteration
- * @todo rename this to BatchGenerator
- * @return Iteration iteration, if std::get<0>(retval) == true means end of
- * iteration, at the end of the iteration, it's responsibility of @a this to
- * shuffle.
- */
- using Generator = std::function<Iteration(void)>;
-
- /**
- * @brief Sample represents a view of single element which can be fed to the
- * model. It is the smallest unit to produce a data
- * @return std::get<0>(Sample) denotes inputs
- * @return std::get<1>(Sample) denotes labels
- */
- using Sample = std::tuple<std::vector<Tensor *>, std::vector<Tensor *>>;
-
- /**
* @brief generator callable type which will fill a sample
- * @todo rename this to Generator.
* @param[in] index current index with range of [0, size() - 1]. If
* size() == SIZE_UNDEFINED, this parameter can be ignored
* @param[out] inputs allocate tensor before expected to be filled by this
* and should be used, or passed at will of caller
*
*/
- using Generator_sample = std::function<bool(
- unsigned int, /** index */
- std::vector<Tensor> & /** inputs */, std::vector<Tensor> & /** labels */)>;
+ using Generator = std::function<bool(unsigned int, /** index */
+ std::vector<Tensor> & /** inputs */,
+ std::vector<Tensor> & /** labels */)>;
constexpr inline static unsigned int SIZE_UNDEFINED =
std::numeric_limits<unsigned int>::max();
}
/**
- * @brief finalize the class with given properties
- * @todo remove this
- * @return Generator generator is a function that generates an iteration upon
- * call
- *
- */
- // [[deprecated("use finalize_sample instead")]]
- virtual Generator finalize(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) = 0;
-
- /**
- * @brief finalize the class to return a immutable Generator.
- * @todo rename this to finalize.
+ * @brief finalize the class to return an immutable Generator.
* @remark this function must assume that the batch dimension of each tensor
* dimension is one. If actual dimension is not one, this function must ignore
* the batch dimension and assume it to be one.
* @return Generator generator is a function that generates a sample upon
* call.
*/
- virtual Generator_sample
- finalize_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims,
- void *user_data = nullptr) {
- return Generator_sample();
- }
-
- /**
- * @brief get size of total dataset batch_size given input_dims, label_dims,
- * if size cannot be determined, this function must return
- * DataProducer::SIZE_UNDEFINED;
- *
- * @param input_dims input dimensions
- * @param label_dims label dimensions
- *
- * @return size calculated size
- */
- // [[deprecated("use size_sample instead")]]
- virtual unsigned int size(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const {
- return SIZE_UNDEFINED;
+ virtual Generator finalize(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims,
+ void *user_data = nullptr) {
+ return Generator();
}
/**
*
* @return size calculated size
*/
- virtual unsigned int
- size_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const {
+ virtual unsigned int size(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims) const {
return SIZE_UNDEFINED;
}
DataBuffer::~DataBuffer(){};
-std::future<std::shared_ptr<BatchQueue>>
-DataBuffer::startFetchWorker(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) {
- NNTR_THROW_IF(!producer, std::invalid_argument) << "producer does not exist";
- auto bq = std::make_shared<BatchQueue>(std::get<PropsBufferSize>(*db_props));
- auto generator = producer->finalize(input_dims, label_dims);
- bq_view = bq;
-
- return std::async(std::launch::async, [bq, generator] {
- while (true) {
- try {
- /// @note add dimension check in debug mode
- auto iteration = generator();
- auto last = std::get<0>(iteration);
- bq->wait_and_push(std::move(iteration));
- if (last == true) {
- break;
- }
- } catch (std::exception &e) {
- bq->wait_and_push({true, {}, {}});
- throw;
- }
- }
- return bq;
- });
-}
-
std::future<std::shared_ptr<IterationQueue>>
-DataBuffer::startFetchWorker_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims,
- bool shuffle) {
+DataBuffer::startFetchWorker(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims,
+ bool shuffle) {
NNTR_THROW_IF(!producer, std::runtime_error) << "producer does not exist";
NNTR_THROW_IF(input_dims.empty(), std::runtime_error)
<< "There must be at least one input";
auto q_size = std::get<PropsBufferSize>(*db_props);
auto iq = std::make_shared<IterationQueue>(q_size, input_dims, label_dims);
- auto generator = producer->finalize_sample(input_dims, label_dims);
- auto size = producer->size_sample(input_dims, label_dims);
+ auto generator = producer->finalize(input_dims, label_dims);
+ auto size = producer->size(input_dims, label_dims);
iq_view = iq;
class NotifyOnDestruct {
});
}
-std::unique_ptr<DataProducer::Iteration> DataBuffer::fetch() {
- NNTR_THROW_IF(!producer, std::runtime_error) << "producer does not exist";
- auto bq = bq_view.lock();
- NNTR_THROW_IF(!bq, std::runtime_error)
- << "Cannot fetch, either fetcher is not running or fetcher has ended and "
- "invalidated";
- auto iteration = bq->wait_and_pop();
- NNTR_THROW_IF(!iteration, std::runtime_error)
- << "fetched iteration is null, should not happen at all cases";
-
- /// if last equals true, resets bq_view
- if (std::get<0>(*iteration) == true) {
- bq_view.reset();
- }
- return iteration;
-}
-
-ScopedView<Iteration> DataBuffer::fetch_sample() {
+ScopedView<Iteration> DataBuffer::fetch() {
NNTR_THROW_IF(!producer, std::runtime_error) << "producer does not exist";
auto iq = iq_view.lock();
NNTR_THROW_IF(!iq, std::runtime_error)
return iq->requestFilled();
}
-DataProducer::Generator
-DataBuffer::batcher(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) {
- NNTR_THROW_IF(!producer, std::invalid_argument) << "producer does not exist";
- return producer->finalize(input_dims, label_dims);
-}
-
-std::tuple<DataProducer::Generator_sample /** generator */,
- unsigned int /** size */>
+std::tuple<DataProducer::Generator /** generator */, unsigned int /** size */>
DataBuffer::getGenerator(const std::vector<TensorDim> &input_dims,
const std::vector<TensorDim> &label_dims) {
NNTR_THROW_IF(!producer, std::invalid_argument) << "producer does not exist";
- return {producer->finalize_sample(input_dims, label_dims),
- producer->size_sample(input_dims, label_dims)};
+ return {producer->finalize(input_dims, label_dims),
+ producer->size(input_dims, label_dims)};
}
void DataBuffer::displayProgress(const int count, float loss) {
#include <tuple>
#include <vector>
-#include <batch_queue.h>
#include <data_producer.h>
#include <dataset.h>
+#include <iteration_queue.h>
#include <tensor_dim.h>
namespace nntrainer {
* all.
* @param input_dims dimension of input_dims
* @param label_dims dimension of label_dims
- * @return std::future<std::shared_ptr<BatchQueue>> Buffer Queue object,
- * release this pointer after calling @a fetch() is done to invalidate
- * subsequent call of @a fetch()
- */
- std::future<std::shared_ptr<BatchQueue>>
- startFetchWorker(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims);
-
- /**
- * @brief prepare iteration a head of time with a dedicated worker. The
- * iteration prepared can be retrieved with @a fetch();
- * @remark the batch dimension of input_dims / label_dims must be same for
- * all.
- * @param input_dims dimension of input_dims
- * @param label_dims dimension of label_dims
* @param shuffle shuffle when fetching
* @return std::future<std::shared_ptr<IterationQueue>> Buffer Queue object,
* release this pointer after calling @a fetch() is done to invalidate
* subsequent call of @a fetch()
*/
std::future<std::shared_ptr<IterationQueue>>
- startFetchWorker_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims,
- bool shuffle = true);
-
- /**
- * @brief Get the Iteration object
- * @note the first element of returned Iteration denotes whether current
- * epoch has ended.
- *
- * @throw std::invalid_argument if @a startFetchWorker hasn't been called or
- * the return value of startFetchWorker has been invalidated.
- * @return std::unique_ptr<DataProducer::Iteration> iteration
- */
- std::unique_ptr<DataProducer::Iteration> fetch();
+ startFetchWorker(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims,
+ bool shuffle = true);
/**
* @brief Get the Iteration object
* @return ScopedView<DataProducer::Iteration> the resource is released to the
* databuffer when the returned ~ScopedView<Iteration> is called
*/
- ScopedView<Iteration> fetch_sample();
-
- /**
- * @brief Get the Generator object and the generator object returns a batch
- * upon call
- * @remark the batch dimension of input_dims / label_dims must be same for
- * all.
- *
- * @param input_dims dimension of input_dims
- * @param label_dims dimension of label_dims
- * @return DataProducer::Generator which generates an iteration
- */
- DataProducer::Generator batcher(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims);
+ ScopedView<Iteration> fetch();
/**
* @brief Get the Generator object and the generator object returns a batch
* @param label_dims dimension of label_dims
* @return DataProducer::Generator which generates an iteration
*/
- std::tuple<DataProducer::Generator_sample /**< callback */,
+ std::tuple<DataProducer::Generator /**< callback */,
unsigned int /**< size */>
getGenerator(const std::vector<TensorDim> &input_dims,
const std::vector<TensorDim> &label_dims);
protected:
std::shared_ptr<DataProducer> producer;
- std::weak_ptr<BatchQueue> bq_view;
std::weak_ptr<IterationQueue> iq_view;
using Props = std::tuple<PropsBufferSize>;
std::unique_ptr<Props> db_props;
DataProducer::Generator
FuncDataProducer::finalize(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) {
- NNTR_THROW_IF(!this->cb, std::invalid_argument)
- << "given callback is nullptr!";
-
- auto input_data = std::shared_ptr<float *>(new float *[input_dims.size()],
- std::default_delete<float *[]>());
- auto label_data = std::shared_ptr<float *>(new float *[label_dims.size()],
- std::default_delete<float *[]>());
-
- return [cb = this->cb, ud = this->user_data_prop->get(), input_dims,
- label_dims, input_data, label_data]() -> DataProducer::Iteration {
- std::vector<Tensor> inputs;
- inputs.reserve(input_dims.size());
-
- float **input_data_raw = input_data.get();
- float **label_data_raw = label_data.get();
-
- for (unsigned int i = 0; i < input_dims.size(); ++i) {
- inputs.emplace_back(input_dims[i]);
- *(input_data_raw + i) = inputs.back().getData();
- }
-
- std::vector<Tensor> labels;
- labels.reserve(label_dims.size());
-
- for (unsigned int i = 0; i < label_dims.size(); ++i) {
- labels.emplace_back(label_dims[i]);
- *(label_data_raw + i) = labels.back().getData();
- }
-
- bool last = false;
- int status = cb(input_data_raw, label_data_raw, &last, ud);
- NNTR_THROW_IF(status != ML_ERROR_NONE, std::invalid_argument)
- << "[DataProducer] Callback returned error: " << status << '\n';
-
- if (last) {
- return {true, {}, {}};
- } else {
- return {false, inputs, labels};
- }
- };
-}
-
-DataProducer::Generator_sample
-FuncDataProducer::finalize_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims,
- void *user_data) {
+ const std::vector<TensorDim> &label_dims,
+ void *user_data) {
NNTR_THROW_IF(!this->cb, std::invalid_argument)
<< "given callback is nullptr!";
/**
* @copydoc DataProducer::finalize(const std::vector<TensorDim>, const
- * std::vector<TensorDim>)
- */
- DataProducer::Generator
- finalize(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) override;
-
- /**
- * @copydoc DataProducer::finalize_sample(const std::vector<TensorDim>, const
* std::vector<TensorDim>, void* user_data)
*/
- DataProducer::Generator_sample
- finalize_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims,
- void *user_data = nullptr) override;
+ DataProducer::Generator finalize(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims,
+ void *user_data = nullptr) override;
private:
datagen_cb cb;
/**
* Copyright (C) 2021 Jihoon Lee <jhoon.it.lee@samsung.com>
*
- * @file batch_queue.cpp
+ * @file iteration_queue.cpp
* @date 13 July 2021
* @brief This file contains thread safe queue
* @see https://github.com/nnstreamer/nntrainer
* @bug No known bugs except for NYI items
*
*/
-#include <batch_queue.h>
#include <chrono>
+#include <iteration_queue.h>
#include <mutex>
#include <nntrainer_error.h>
namespace nntrainer {
-BatchQueue::BatchQueue(unsigned int queue_capacity_) :
- queue_capacity(queue_capacity_) {
- NNTR_THROW_IF(queue_capacity == 0, std::invalid_argument)
- << "queue capacity of zero not supported!";
-}
-
-BatchQueue::BatchQueue(const BatchQueue &rhs) :
- queue_capacity(rhs.queue_capacity) {}
-
-BatchQueue &BatchQueue::operator=(const BatchQueue &rhs) {
- if (this == &rhs) {
- return *this;
- }
- this->queue_capacity = rhs.queue_capacity;
- return *this;
-}
-
-void BatchQueue::wait_and_push(T &&data) noexcept {
- std::unique_lock<std::shared_mutex> lk(q_mutex);
- q_writer_cv.wait(lk, [this] { return q.size() != queue_capacity; });
- q.push(std::make_unique<T>(data));
- lk.unlock();
- q_reader_cv.notify_one();
-}
-
-std::unique_ptr<BatchQueue::T> BatchQueue::wait_and_pop() noexcept {
- std::unique_lock<std::shared_mutex> lk(q_mutex);
- q_reader_cv.wait(lk, [this] { return !q.empty(); });
-
- /// @note this invalidates q.front(), but it is okay because it is locked and
- /// popped right away
- auto ptr = std::move(q.front());
- q.pop();
- lk.unlock();
- q_writer_cv.notify_one();
-
- return ptr;
-}
-
-bool BatchQueue::isFull() const {
- std::shared_lock<std::shared_mutex> lk(q_mutex);
- return queue_capacity == q.size();
-}
-
-bool BatchQueue::isEmpty() const {
- std::shared_lock<std::shared_mutex> lk(q_mutex);
- return q.empty();
-}
-
IterationQueue::IterationQueue(
unsigned int num_slots, const std::vector<ml::train::TensorDim> &input_dims,
const std::vector<ml::train::TensorDim> &label_dims) :
void IterationQueue::notifyEndOfRequestEmpty() {
std::unique_lock lg(empty_mutex);
- NNTR_THROW_IF(flow_state.exchange(FlowState::FLOW_STATE_STOP_REQUESTED) !=
- FlowState::FLOW_STATE_OPEN,
- std::invalid_argument)
- << "the queue expect state of "
- << static_cast<unsigned>(FlowState::FLOW_STATE_STOP_REQUESTED)
+ auto open_state = FlowState::FLOW_STATE_OPEN;
+ bool exchange_result = flow_state.compare_exchange_strong(
+ open_state, FlowState::FLOW_STATE_STOP_REQUESTED);
+ NNTR_THROW_IF(!exchange_result, std::invalid_argument)
+ << "the queue expect state of " << static_cast<unsigned>(open_state)
<< " but met " << static_cast<unsigned>(flow_state.load());
/// below is useful information when debugging iteration queue, but there will
/// be too much log if we turn the log on. so leaving it as a comment for now.
/**
* Copyright (C) 2021 Jihoon Lee <jhoon.it.lee@samsung.com>
*
- * @file batch_queue.h
+ * @file iteration_queue.h
* @date 13 July 2021
- * @brief This file contains thread safe queue for batch
- * @note This file is made to easily extend to type T, although it didn't to
- * save compile time
+ * @brief This file contains thread safe queue for a single iteration
* @see https://github.com/nnstreamer/nntrainer
* @author Jihoon Lee <jhoon.it.lee@samsung.com>
* @bug No known bugs except for NYI items
*
*/
-#ifndef __BATCH_QUEUE_H__
-#define __BATCH_QUEUE_H__
+#ifndef __ITERATION_QUEUE_H__
+#define __ITERATION_QUEUE_H__
#include <atomic>
#include <condition_variable>
namespace nntrainer {
/**
- * @brief Thread Safe batch queue Queue
- *
- */
-class BatchQueue {
-public:
- using T = DataProducer::Iteration; /**< Iteration as type T to leave room to
- extend the class to type T */
-
- /**
- * @brief Construct a new batch queue Queue object
- * @note this is not the size of buffersize, but it is @a
- * buffersize/batch_size the size never changes after the BatchQueue has been
- * created
- * @param queue_capacity_ max queue size
- */
- BatchQueue(unsigned int queue_capacity_);
-
- /**
- * @brief Construct a new batch queue Queue object
- * @note this does not copy the original queue, but only queue size
- * @param rhs batch queue queue to copy
- */
- BatchQueue(const BatchQueue &rhs);
-
- /**
- * @brief Copy-assign batch queue queue
- *
- * @param rhs batch queue queue to copy
- * @return BatchQueue& new queue
- */
- BatchQueue &operator=(const BatchQueue &rhs);
-
- /**
- * @brief push data to queue, if the batch queue is full, wait if full
- *
- * @param data data to put inside the batch queue
- */
- void wait_and_push(T &&data) noexcept;
-
- /**
- * @brief pop data from the queue, wait if empty
- *
- * @return std::unique_ptr<T> data
- */
- std::unique_ptr<T> wait_and_pop() noexcept;
-
- /**
- * @brief check if current queue is full
- *
- * @return bool true if full
- */
- bool isFull() const;
-
- /**
- * @brief check if current queue is empty
- *
- * @return bool true if empty
- */
- bool isEmpty() const;
-
-private:
- unsigned int queue_capacity;
- mutable std::shared_mutex q_mutex;
- std::condition_variable_any q_reader_cv;
- std::condition_variable_any q_writer_cv;
-
- std::queue<std::unique_ptr<T>> q;
-};
-
-/**
* @brief Thread Safe Queue implementation dedicated for the non-owing pointer
*
* @tparam original type of the view (T * will be pushed and pop)
filled_mutex; /**< mutex to be used when it is mutually exclusive to the
requesting filled slots */
std::condition_variable_any
- notify_emptied_cv; /**< conditional variable to wait based on the
+ notify_emptied_cv; /**< conditional variable to wait based on the
num_being_filled */
std::atomic<FlowState> flow_state; /**< flow state of the queue */
} // namespace nntrainer
-#endif // __BATCH_QUEUE_H__
+#endif // __ITERATION_QUEUE_H__
dataset_sources = [
- 'batch_queue.cpp',
+ 'iteration_queue.cpp',
'databuffer.cpp',
'data_iteration.cpp',
'databuffer_factory.cpp',
return false;
}
-unsigned int
-RandomDataOneHotProducer::size(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const {
- return std::get<PropsNumSamples>(*rd_one_hot_props).get();
-}
-
void RandomDataOneHotProducer::setProperty(
const std::vector<std::string> &properties) {
auto left = loadProperties(properties, *rd_one_hot_props);
DataProducer::Generator
RandomDataOneHotProducer::finalize(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) {
- /** check if the given producer is ready to finalize */
- nntrainer::PropsMin min_;
- nntrainer::PropsMax max_;
- std::tie(min_, max_, std::ignore) = *rd_one_hot_props;
-
- /// @todo expand this to non onehot case
- NNTR_THROW_IF(std::any_of(label_dims.begin(), label_dims.end(),
- [](const TensorDim &dim) {
- return dim.channel() != 1 || dim.height() != 1;
- }),
- std::invalid_argument)
- << "Label dimension containing channel or height not allowed";
-
- NNTR_THROW_IF(min_.get() > max_.get(), std::invalid_argument)
- << "Min value is bigger then max value, min: " << min_.get()
- << "max: " << max_.get();
-
- NNTR_THROW_IF(size(input_dims, label_dims) == 0, std::invalid_argument)
- << "size is zero, dataproducer does not provide anything";
-
- /** prepare states for the generator */
- std::vector<std::uniform_int_distribution<unsigned int>> label_chooser_;
- label_chooser_.reserve(label_dims.size());
- std::transform(label_dims.begin(), label_dims.end(),
- std::back_inserter(label_chooser_),
- [this](const TensorDim &label_dim) {
- return std::uniform_int_distribution<unsigned int>(
- 0, label_dim.width() - 1);
- });
-
- std::mt19937 rng;
- rng.seed(getSeed());
- auto sz = size(input_dims, input_dims);
- /** DataProducer::Generator */
- return [rng, sz, input_dims, label_dims, min_ = min_.get(), max_ = max_.get(),
- current_iteration = 0ULL,
- label_chooser = std::move(label_chooser_)]() mutable {
- if (current_iteration++ == sz / input_dims[0].batch()) {
- current_iteration = 0;
- return DataProducer::Iteration(true, {}, {});
- }
-
- auto populate_tensor = [&](const TensorDim &input_dim) {
- Tensor t(input_dim);
- t.setRandUniform(min_, max_);
-
- return t;
- };
-
- auto populate_label =
- [&](const TensorDim &label_dim,
- std::uniform_int_distribution<unsigned int> &label_dist_) {
- Tensor t(label_dim);
- t.setZero();
- for (unsigned int b = 0; b < t.batch(); ++b) {
- t.setValue(b, 0, 0, label_dist_(rng), 1);
- }
- return t;
- };
-
- std::vector<Tensor> inputs;
- inputs.reserve(input_dims.size());
-
- std::vector<Tensor> labels;
- labels.reserve(label_dims.size());
-
- std::transform(input_dims.begin(), input_dims.end(),
- std::back_inserter(inputs), populate_tensor);
-
- std::transform(label_dims.begin(), label_dims.end(), label_chooser.begin(),
- std::back_inserter(labels), populate_label);
-
- return DataProducer::Iteration(false, inputs, labels);
- };
-}
-
-DataProducer::Generator_sample RandomDataOneHotProducer::finalize_sample(
- const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims, void *user_data) {
+ const std::vector<TensorDim> &label_dims,
+ void *user_data) {
/** check if the given producer is ready to finalize */
nntrainer::PropsMin min_;
nntrainer::PropsMax max_;
rng.seed(getSeed());
auto sz = size(input_dims, input_dims);
- /** DataProducer::Generator_sample */
+ /** DataProducer::Generator */
return [rng, sz, min_ = min_.get(), max_ = max_.get(),
label_chooser = std::move(label_chooser_)](
unsigned int idx, std::vector<Tensor> &inputs,
};
}
-unsigned int RandomDataOneHotProducer::size_sample(
- const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const {
+unsigned int
+RandomDataOneHotProducer::size(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims) const {
return std::get<PropsNumSamples>(*rd_one_hot_props).get();
}
} // namespace nntrainer
bool isMultiThreadSafe() const override;
/**
- * @copydoc DataProducer::size()
- */
- unsigned int size(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const override;
-
- /**
* @copydoc DataProducer::setProeprty(const std::vector<std::string>
* &properties)
*/
* @copydoc DataProducer::finalize(const std::vector<TensorDim>, const
* std::vector<TensorDim>)
*/
- DataProducer::Generator
- finalize(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) override;
+ DataProducer::Generator finalize(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims,
+ void *user_data = nullptr) override;
/**
* @copydoc DataProducer::finalize_sample(const std::vector<TensorDim>, const
* std::vector<TensorDim>, void *)
*/
- DataProducer::Generator_sample
- finalize_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims,
- void *user_data = nullptr) override;
-
- /**
- * @copydoc DataProducer::size_sample(const std::vector<TensorDim>, const
- * std::vector<TensorDim>)
- */
- unsigned int
- size_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const override;
+ unsigned int size(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims) const override;
private:
using Props = std::tuple<PropsMin, PropsMax, PropsNumSamples>;
return RawFileDataProducer::type;
}
-unsigned int
-RawFileDataProducer::size(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const {
- auto size_accumulator = [](const unsigned int &a, const TensorDim &b) {
- /// @bug this must be getDataLen(), as this function will be removed, not
- /// fixed on purpose
- return a + b.getFeatureLen();
- };
-
- auto sample_size =
- std::accumulate(input_dims.begin(), input_dims.end(), 0u, size_accumulator);
- sample_size = std::accumulate(label_dims.begin(), label_dims.end(),
- sample_size, size_accumulator);
-
- NNTR_THROW_IF(sample_size == 0, std::invalid_argument)
- << "The feature size of input_dims and label_dims are zeros";
-
- auto path_prop = std::get<props::FilePath>(*raw_file_props);
- auto file_size = path_prop.file_size();
-
- /// checking alignment is a good way to make check if a file is valid,
- /// unfortunately, our test dataset does not have this property
- /// (trainingSet.dat, valSet.dat, testSet.dat) after checking, we can
- /// uncomment below line.
- // NNTR_THROW_IF((file_size % sample_size * RawFileDataProducer::pixel_size !=
- // 0),
- // std::invalid_argument)
- // << " Given file does not align with the given sample size, sample size: "
- // << sample_size << " file_size: " << file_size;
-
- return file_size / (sample_size * RawFileDataProducer::pixel_size);
-}
-
void RawFileDataProducer::setProperty(
const std::vector<std::string> &properties) {
auto left = loadProperties(properties, *raw_file_props);
DataProducer::Generator
RawFileDataProducer::finalize(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) {
-
- /****************** Validation ****************/
+ const std::vector<TensorDim> &label_dims,
+ void *user_data) {
auto sz = size(input_dims, label_dims);
- auto batch = input_dims[0].batch();
-
- NNTR_THROW_IF(sz < batch, std::invalid_argument)
- << "calculated sample size is less than a batch";
-
- auto path_prop = std::get<props::FilePath>(*raw_file_props);
-
- auto size_accumulator = [](const unsigned int &a, const TensorDim &b) {
- return a + b.getFeatureLen();
- };
-
- auto sample_size =
- std::accumulate(input_dims.begin(), input_dims.end(), 0u, size_accumulator);
- sample_size = std::accumulate(label_dims.begin(), label_dims.end(),
- sample_size, size_accumulator);
-
- /// below works when checking alignment is correct
- // auto sample_size = path_prop.file_size() / (sz *
- // RawFileDataProducer::pixel_size);
-
- /****************** Prepare states ****************/
- std::mt19937 rng_;
- rng_.seed(getSeed());
- auto idxes_ = std::vector<unsigned int>();
- idxes_.reserve(sz);
- /// idxes point to the file position in bytes where a sample starts
- std::generate_n(std::back_inserter(idxes_), sz,
- [sample_size, current = 0ULL]() mutable {
- auto c = current;
- current += sample_size * RawFileDataProducer::pixel_size;
- return c;
- });
- /// @todo remove shuffle from here as we are migrating this to element wise
- /// operator
- std::shuffle(idxes_.begin(), idxes_.end(), rng_);
-
- auto file =
- std::make_shared<std::ifstream>(path_prop.get(), std::ios::binary);
-
- return [batch, input_dims, label_dims, rng = rng_, idxes = std::move(idxes_),
- file, current_idx = 0]() mutable -> DataProducer::Iteration {
- if (idxes.size() - current_idx < batch) {
- std::shuffle(idxes.begin(), idxes.end(), rng);
- current_idx = 0;
- return DataProducer::Iteration(true, {}, {});
- }
-
- std::vector<Tensor> inputs;
- inputs.reserve(input_dims.size());
- for (unsigned int i = 0; i < input_dims.size(); ++i) {
- inputs.emplace_back(input_dims[i]);
- }
-
- std::vector<Tensor> labels;
- labels.reserve(label_dims.size());
- for (unsigned int i = 0; i < label_dims.size(); ++i) {
- labels.emplace_back(label_dims[i]);
- }
-
- for (unsigned int b = 0; b < batch; ++b) {
- file->seekg(idxes[current_idx], std::ios_base::beg);
- for (auto &input : inputs) {
- Tensor input_slice = input.getBatchSlice(b, 1);
- input_slice.read(*file);
- }
- for (auto &label : labels) {
- Tensor label_slice = label.getBatchSlice(b, 1);
- label_slice.read(*file);
- }
-
- current_idx++;
- }
-
- return DataProducer::Iteration(false, inputs, labels);
- };
-}
-
-DataProducer::Generator_sample
-RawFileDataProducer::finalize_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims,
- void *user_data) {
- auto sz = size_sample(input_dims, label_dims);
auto path_prop = std::get<props::FilePath>(*raw_file_props);
auto size_accumulator = [](const unsigned int &a, const TensorDim &b) {
};
}
-unsigned int RawFileDataProducer::size_sample(
- const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const {
+unsigned int
+RawFileDataProducer::size(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims) const {
auto size_accumulator = [](const unsigned int &a, const TensorDim &b) {
return a + b.getFeatureLen();
};
const std::string getType() const override;
/**
- * @copydoc DataProducer::size()
- */
- unsigned int size(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const override;
-
- /**
* @copydoc DataProducer::setProeprty(const std::vector<std::string>
* &properties)
*/
/**
* @copydoc DataProducer::finalize(const std::vector<TensorDim>, const
* std::vector<TensorDim>)
- * @remark current implementation drops remainder that are less than the
- * batchsize, if we don't want the behavior, there needs some refactoring
- * across data processing places because we are assuming fixed batchsize at
- * this point
*/
- DataProducer::Generator
- finalize(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) override;
+ DataProducer::Generator finalize(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims,
+ void *user_data = nullptr) override;
/**
- * @copydoc DataProducer::finalize_sample(const std::vector<TensorDim>, const
+ * @copydoc DataProducer::size(const std::vector<TensorDim>, const
* std::vector<TensorDim>)
*/
- DataProducer::Generator_sample
- finalize_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims,
- void *user_data = nullptr) override;
-
- /**
- * @copydoc DataProducer::size_sample(const std::vector<TensorDim>, const
- * std::vector<TensorDim>)
- */
- unsigned int
- size_sample(const std::vector<TensorDim> &input_dims,
- const std::vector<TensorDim> &label_dims) const override;
+ unsigned int size(const std::vector<TensorDim> &input_dims,
+ const std::vector<TensorDim> &label_dims) const override;
private:
std::ifstream file;
training.loss = 0.0f;
std::future<std::shared_ptr<IterationQueue>> future_iq =
- train_buffer->startFetchWorker_sample(in_dims, label_dims, true);
+ train_buffer->startFetchWorker(in_dims, label_dims, true);
// /// @todo make this working, test buffer is running but doing nothing
// if (test_buffer != nullptr && test_buffer->isValid()) {
int count = 0;
while (true) {
- ScopedView<Iteration> iter_view = train_buffer->fetch_sample();
+ ScopedView<Iteration> iter_view = train_buffer->fetch();
if (iter_view.isEmpty()) {
break;
}
auto &iteration = iter_view.get();
if (iteration.batch() != batch_size) {
- /// this is partial batch scenario
+ /// @todo support partial batch
continue;
}
/// @todo multiple input support
unsigned int tcases = 0;
std::future<std::shared_ptr<IterationQueue>> future_iq =
- valid_buffer->startFetchWorker_sample(in_dims, label_dims, false);
+ valid_buffer->startFetchWorker(in_dims, label_dims, false);
while (true) {
- ScopedView<Iteration> iter_view = valid_buffer->fetch_sample();
+ ScopedView<Iteration> iter_view = valid_buffer->fetch();
if (iter_view.isEmpty()) {
break;
}
auto &iter = iter_view.get();
if (iter.batch() != batch_size) {
- /// this is partial batch scenario
+ /// @todo support partial batch
continue;
}
/// @todo multiple input support
return {inputs, labels};
}
-/** batchwise producer tests */
-/// @todo remove this
+void DataProducerSemantics::TearDown() {}
+
void DataProducerSemantics::SetUp() {
auto [producerFactory, properties, input_dims_, label_dims_, validator_,
result_] = GetParam();
-
- /** check if input_dims, label_dims not empty and have the same batch */
- ASSERT_FALSE(input_dims_.empty());
- ASSERT_FALSE(label_dims_.empty());
- auto b = input_dims_[0].batch();
-
- ASSERT_TRUE(std::all_of(input_dims_.begin(), input_dims_.end(),
- [b](const auto &dim) { return b == dim.batch(); }));
- ASSERT_TRUE(std::all_of(label_dims_.begin(), label_dims_.end(),
- [b](const auto &dim) { return b == dim.batch(); }));
-
producer = producerFactory(properties);
input_dims = std::move(input_dims_);
label_dims = std::move(label_dims_);
}
}
-void DataProducerSemantics::TearDown() {}
-
TEST_P(DataProducerSemantics, finalize_pn) {
if (result == DataProducerSemanticsExpectedResult::FAIL_AT_FINALIZE) {
EXPECT_ANY_THROW(producer->finalize(input_dims, label_dims));
}
auto generator = producer->finalize(input_dims, label_dims);
- if (result == DataProducerSemanticsExpectedResult::FAIL_AT_GENERATOR_CALL) {
- EXPECT_ANY_THROW(generator());
- } else {
- EXPECT_NO_THROW(generator());
- }
-}
-
-TEST_P(DataProducerSemantics, fetch_one_epoch_or_10_iteration_pn) {
- if (result != DataProducerSemanticsExpectedResult::SUCCESS) {
- return; // skip this test
- }
-
- auto generator = producer->finalize(input_dims, label_dims);
- auto sz = producer->size(input_dims, label_dims);
- bool has_fixed_size = sz != nntrainer::DataProducer::SIZE_UNDEFINED;
-
- if (!has_fixed_size) {
- sz = 10;
- }
-
- auto num_iterations = sz / input_dims[0].batch();
-
- for (unsigned i = 0; i < num_iterations; ++i) {
- auto [last, ins, labels] = generator();
-
- ASSERT_FALSE(last) << " reached last at iteration: " << i << '\n';
- if (validator) {
- ASSERT_TRUE(validator(ins, labels))
- << " failed validation for iteration: " << i << '\n';
- }
- }
-
- if (has_fixed_size) {
- {
- auto result = generator();
- bool last = std::get<0>(result);
- EXPECT_TRUE(last);
- }
-
- {
- auto [last, ins, labels] = generator();
- EXPECT_TRUE(validator(ins, labels))
- << "failed last validation after one epoch\n";
- EXPECT_FALSE(last);
- }
- }
-}
-
-/********* sample tests **********/
-void DataProducerSemantics_samples::TearDown() {}
-
-void DataProducerSemantics_samples::SetUp() {
- auto [producerFactory, properties, input_dims_, label_dims_, validator_,
- result_] = GetParam();
- producer = producerFactory(properties);
- input_dims = std::move(input_dims_);
- label_dims = std::move(label_dims_);
- result = result_;
- validator = std::move(validator_);
-
- if (result != DataProducerSemanticsExpectedResult::SUCCESS) {
- ASSERT_EQ(validator, nullptr)
- << "Given expected result of not success, validator must be empty!";
- }
-}
-
-TEST_P(DataProducerSemantics_samples, finalize_pn) {
- if (result == DataProducerSemanticsExpectedResult::FAIL_AT_FINALIZE) {
- EXPECT_ANY_THROW(producer->finalize_sample(input_dims, label_dims));
- } else {
- EXPECT_NO_THROW(producer->finalize_sample(input_dims, label_dims));
- }
-}
-
-TEST_P(DataProducerSemantics_samples, error_once_or_not_pn) {
- if (result == DataProducerSemanticsExpectedResult::FAIL_AT_FINALIZE) {
- return; // skip this test
- }
-
- auto generator = producer->finalize_sample(input_dims, label_dims);
auto sample_data = createSample(input_dims, label_dims);
if (result == DataProducerSemanticsExpectedResult::FAIL_AT_GENERATOR_CALL) {
}
}
-TEST_P(DataProducerSemantics_samples, fetch_one_epoch_or_10_iteration_pn) {
+TEST_P(DataProducerSemantics, fetch_one_epoch_or_10_iteration_pn) {
if (result != DataProducerSemanticsExpectedResult::SUCCESS) {
return; // skip this test
}
- auto generator = producer->finalize_sample(input_dims, label_dims);
- auto sz = producer->size_sample(input_dims, label_dims);
+ auto generator = producer->finalize(input_dims, label_dims);
+ auto sz = producer->size(input_dims, label_dims);
bool has_fixed_size = sz != nntrainer::DataProducer::SIZE_UNDEFINED;
if (!has_fixed_size) {
/**
* @brief Dataset Producer Semantics Tests
- * @todo remove this
*/
class DataProducerSemantics
: public ::testing::TestWithParam<DataProducerSemanticsParamType> {
};
/**
- * @brief Dataset Producer Semantics Tests
- * @todo remove suffix
- */
-class DataProducerSemantics_samples
- : public ::testing::TestWithParam<DataProducerSemanticsParamType> {
-public:
- /**
- * @brief SetUp test cases here
- *
- */
- virtual void SetUp();
-
- /**
- * @brief do here if any memory needs to be released
- *
- */
- virtual void TearDown();
-
-protected:
- std::unique_ptr<nntrainer::DataProducer>
- producer; /**< producer to be tested */
- std::vector<nntrainer::TensorDim> input_dims; /**< input dims */
- std::vector<nntrainer::TensorDim> label_dims; /**< output dims */
- DataProducerValidatorType validator; /**< result validator */
- DataProducerSemanticsExpectedResult result; /**< expected result */
-};
-
-/**
* @brief Create a Data Producer object
*
* @tparam T inherited class of data producer
'unittest_random_data_producers.cpp',
'unittest_func_data_producer.cpp',
'unittest_raw_file_data_producer.cpp',
- 'unittest_batch_queue.cpp',
+ 'unittest_iteration_queue.cpp',
'unittest_databuffer.cpp',
'unittest_data_iteration.cpp'
]
+++ /dev/null
-// SPDX-License-Identifier: Apache-2.0
-/**
- * Copyright (C) 2021 Jihoon Lee <jhoon.it.lee@samsung.com>
- *
- * @file unittest_batch_queue.cpp
- * @date 12 July 2021
- * @brief Batch Queue Test
- * @see https://github.com/nnstreamer/nntrainer
- * @author Jihoon Lee <jhoon.it.lee@samsung.com>
- * @bug No known bugs except for NYI items
- */
-
-#include <gtest/gtest.h>
-
-#include <batch_queue.h>
-#include <random_data_producers.h>
-#include <tensor.h>
-
-#include <algorithm>
-#include <future>
-#include <thread>
-#include <tuple>
-#include <vector>
-
-using namespace std::chrono_literals;
-
-/** THIS number can be divided with 3 and 4 is multiple of current multiple
- * batch scenario setup, as a @todo this should be better part of a
- * IterQueueTestParamType */
-static constexpr unsigned int DATA_SIZE = 384u;
-
-nntrainer::DataProducer::Iteration data(size_t key) {
- return {true, std::vector<nntrainer::Tensor>(key), {}};
-};
-
-void test_data(const nntrainer::DataProducer::Iteration &dat,
- size_t expected_key) {
- EXPECT_EQ(std::get<1>(dat).size(), expected_key);
-}
-
-TEST(BatchQueue, pushPop_p) {
- nntrainer::BatchQueue bq(1);
-
- EXPECT_NO_THROW(bq.wait_and_push(data(1)));
- auto result = bq.wait_and_pop();
- test_data(*result, 1);
-}
-
-TEST(BatchQueue, threadedPushPops_p) {
- /** preparing primitives */
- using namespace std::chrono_literals;
- auto push_after = [](nntrainer::BatchQueue &bq, const auto &duration,
- size_t key) {
- std::this_thread::sleep_for(duration);
- EXPECT_NO_THROW(bq.wait_and_push(data(key)));
- };
- auto pop_after = [](nntrainer::BatchQueue &bq, const auto &duration,
- size_t key) {
- std::this_thread::sleep_for(duration);
- auto result = bq.wait_and_pop();
- test_data(*result, key);
- };
-
- std::vector<std::future<void>> futures;
- {
- futures.clear();
- /// 0 -> push(1)
- /// 250ms -> pop(1)
- nntrainer::BatchQueue bq(1);
- futures.push_back(
- std::async(std::launch::async, push_after, std::ref(bq), 0ms, 1));
- futures.push_back(
- std::async(std::launch::async, pop_after, std::ref(bq), 250ms, 1));
- for (auto &future : futures) {
- future.get();
- }
- }
-
- {
- futures.clear();
- /// 0 -> pop(1)
- /// 250ms -> push(1)
- nntrainer::BatchQueue bq(1);
- futures.push_back(
- std::async(std::launch::async, pop_after, std::ref(bq), 0ms, 1));
- futures.push_back(
- std::async(std::launch::async, push_after, std::ref(bq), 250ms, 1));
- for (auto &future : futures) {
- future.get();
- }
- }
-
- {
- futures.clear();
- /// 0 -> push(1)
- /// 300ms -> push(2)
- /// 300ms -> pop(1)
- /// 500ms -> push(3)
- /// 600ms -> push(4) (waits)
- /// 750ms -> pop(2)
- /// 1000ms-> pop(3)
- nntrainer::BatchQueue bq(2);
- futures.push_back(
- std::async(std::launch::async, push_after, std::ref(bq), 0ms, 1));
- futures.push_back(
- std::async(std::launch::async, push_after, std::ref(bq), 300ms, 2));
- futures.push_back(
- std::async(std::launch::async, pop_after, std::ref(bq), 300ms, 1));
- futures.push_back(
- std::async(std::launch::async, push_after, std::ref(bq), 500ms, 3));
- futures.push_back(
- std::async(std::launch::async, push_after, std::ref(bq), 600ms, 4));
- futures.push_back(
- std::async(std::launch::async, pop_after, std::ref(bq), 750ms, 2));
- futures.push_back(
- std::async(std::launch::async, pop_after, std::ref(bq), 1000ms, 3));
- for (auto &future : futures) {
- future.get();
- }
- }
-}
-
-using IterQueueTestParamType =
- std::tuple<unsigned int /**< queue size */,
- std::vector<nntrainer::TensorDim> /**< input dimensions */,
- std::vector<nntrainer::TensorDim> /**< label dimensions */>;
-
-/**
- * @brief Iteration Queue Test
- */
-class IterQueueScenarios
- : public ::testing::TestWithParam<IterQueueTestParamType> {
-public:
- /**
- * @brief SetUp test cases here
- *
- */
- virtual void SetUp() {
- auto &[q_size, input_dims, label_dims] = GetParam();
- iq = std::make_unique<nntrainer::IterationQueue>(q_size, input_dims,
- label_dims);
- auto producer = std::make_unique<nntrainer::RandomDataOneHotProducer>();
- producer->setProperty({"num_samples=512"});
- sample_getter = producer->finalize_sample(input_dims, label_dims);
- this->input_dims = input_dims;
- this->label_dims = label_dims;
- sum_from_producer = 0;
- sum_from_consumer = 0;
- }
-
- virtual void
- produceSample(unsigned int size,
- const std::chrono::milliseconds *duration = nullptr) {
- auto sample_view = iq->requestEmpty();
- if (sample_view.isEmpty()) {
- throw std::runtime_error("sample_view is empty!");
- }
- auto &sample = sample_view.get();
- auto &inputs = sample.getInputsRef();
- auto &labels = sample.getLabelsRef();
- if (duration) {
- std::this_thread::sleep_for(*duration);
- }
- std::lock_guard<std::mutex> lg(producer_mutex);
- sample_getter(size, inputs, labels);
- sum_from_producer += getSum(inputs, labels);
- }
-
- virtual std::future<void>
- produceSampleAfter(unsigned int size,
- const std::chrono::milliseconds &duration) {
- return std::async(std::launch::async, [this, size, duration] {
- produceSample(size, &duration);
- });
- }
-
- virtual std::future<bool>
- consumeIterationAfter(const std::chrono::milliseconds &duration) {
- return std::async(std::launch::async, [this, duration] {
- std::this_thread::sleep_for(duration);
- return consumeIteration();
- });
- }
-
- virtual bool consumeIteration() {
- auto iter_view = iq->requestFilled();
- if (iter_view.isEmpty()) {
- return false;
- }
-
- auto &iter = iter_view.get();
- auto &inputs = iter.getInputsRef();
- auto &labels = iter.getLabelsRef();
- std::lock_guard<std::mutex> lg(consumer_mutex);
- sum_from_consumer += getSum(inputs, labels);
- return true;
- }
-
- /**
- * @brief do here if any memory needs to be released
- *
- */
- virtual void TearDown(){};
-
-protected:
- /**
- * @brief Get a single value (sum) for the given inputs and outputs, this is
- * to effectively reduce a tensor to a single value
- *
- * @param inputs inputs
- * @param labels labels
- * @return long double single value which sums everything
- */
- long double getSum(const std::vector<nntrainer::Tensor> &inputs,
- const std::vector<nntrainer::Tensor> &labels) {
- auto accumulator = [](long double old_sum, const nntrainer::Tensor &t) {
- return old_sum + (long double)t.sum({0, 1, 2, 3}).getValue(0, 0, 0, 0);
- };
-
- long double sum =
- std::accumulate(inputs.begin(), inputs.end(), 0.0l, accumulator);
- return std::accumulate(labels.begin(), labels.end(), sum, accumulator);
- }
-
- /** @todo currently there is an error between those two around 1e-8, this
- * should be changed to integer based comparison */
- long double
- sum_from_producer; /**< sum of all the dataset from producer side*/
- long double
- sum_from_consumer; /**< sum of all the dataset from consumer side */
-
- mutable std::mutex producer_mutex, consumer_mutex;
- nntrainer::DataProducer::Generator_sample sample_getter;
- std::unique_ptr<nntrainer::IterationQueue> iq;
- std::vector<nntrainer::TensorDim> input_dims; /**< input dims */
- std::vector<nntrainer::TensorDim> label_dims; /**< output dims */
-};
-
-TEST_P(IterQueueScenarios, produceAndConsumeSingle_p) {
- auto batch_size = iq->batch();
- for (unsigned int i = 0; i < batch_size; ++i) {
- produceSample(i);
- }
- consumeIteration();
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios, produceAndConsumeOnce_p) {
- auto q_size = iq->slots();
- auto q_size_in_sample = q_size * iq->batch();
- /// step1: fill buffer to the queue (filling 0 ~ 11th samples)
- for (unsigned int i = 0; i < q_size_in_sample; ++i) {
- produceSample(i);
- }
-
- /// step2: consume the filled buffer from the queue
- for (unsigned int i = 0; i < q_size; ++i) {
- consumeIteration();
- }
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios, produceAndConsumeSyncTwice_p) {
- auto q_size = iq->slots();
- auto q_size_in_sample = q_size * iq->batch();
- /// step1: fill buffer to the queue (filling full queue)
- for (unsigned int i = 0; i < q_size_in_sample; ++i) {
- produceSample(i);
- }
-
- /// step2: consume the filled buffer from the queue
- for (unsigned int i = 0; i < q_size; ++i) {
- consumeIteration();
- }
-
- /// step3: fill buffer to the queue (filling full queue)
- for (unsigned int i = q_size_in_sample; i < q_size_in_sample * 2; ++i) {
- produceSample(i);
- }
-
- /// step4: consume the filled buffer from the queue
- for (unsigned int i = 0; i < q_size; ++i) {
- consumeIteration();
- }
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios, produceAndConsumeSyncMixed_p) {
- auto q_size = iq->slots();
- auto q_size_in_sample = q_size * iq->batch();
- /// step1: fill buffer to the queue (filling half samples)
- for (unsigned int i = 0; i < q_size_in_sample / 2; ++i) {
- produceSample(i);
- }
-
- /// step2: consume the filled buffer from the queue
- for (unsigned int i = 0; i < q_size / 2; ++i) {
- consumeIteration();
- }
-
- /// step3: fill buffer to the queue (filling rest half samples)
- for (unsigned int i = q_size_in_sample / 2; i < q_size_in_sample; ++i) {
- produceSample(i);
- }
-
- /// step4: consume the filled buffer from the queue
- for (unsigned int i = q_size / 2; i < q_size; ++i) {
- consumeIteration();
- }
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios,
- produceAndConsumAsyncForDeterminedSizeConsumerRunningFirst_p) {
-
- auto producer = std::async(std::launch::async, [this]() {
- std::this_thread::sleep_for(50ms);
- for (unsigned int i = 0u; i < DATA_SIXE; ++i) {
- produceSample(i);
- }
- });
-
- auto consumer = std::async(std::launch::async, [this]() {
- for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
- consumeIteration();
- }
- });
-
- producer.get();
- consumer.get();
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios,
- produceAndConsumAsyncForDeterminedSizeProducerRunningFirst_p) {
-
- auto producer = std::async(std::launch::async, [this]() {
- for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
- produceSample(i);
- }
- });
-
- auto consumer = std::async(std::launch::async, [this]() {
- std::this_thread::sleep_for(50ms);
- for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
- consumeIteration();
- }
- });
-
- producer.get();
- consumer.get();
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios,
- produceAndConsumAsyncForUnknownSizeProducerRunningFirst_p) {
- auto producer = std::async(std::launch::async, [this]() {
- for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
- produceSample(i);
- }
- iq->notifyEndOfRequestEmpty();
- });
-
- auto consumer = std::async(std::launch::async, [this]() {
- std::this_thread::sleep_for(50ms);
- while (consumeIteration()) {
- }
- });
-
- producer.get();
- consumer.get();
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios,
- produceAndConsumAsyncForUnknownSizeConsumerRunningFirst_p) {
- auto producer = std::async(std::launch::async, [this]() {
- std::this_thread::sleep_for(50ms);
- for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
- produceSample(i);
- }
- iq->notifyEndOfRequestEmpty();
- });
-
- auto consumer = std::async(std::launch::async, [this]() {
- while (consumeIteration()) {
- }
- });
-
- producer.get();
- consumer.get();
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios, produceAndConsumPartiallyFilledBatch_p) {
- auto b = iq->batch();
- if (b == 1) {
- return; /// if batch is one, there is no partially filled batch
- }
-
- auto getSumOfPartiallyFilledTensor =
- [](const nntrainer::Tensor &t, unsigned int actual_batch) -> long double {
- long double sum = 0;
- nntrainer::Tensor result = t.sum_by_batch();
- for (unsigned int i = 0; i < actual_batch; ++i) {
- sum += result.getValue(i, 0, 0, 0);
- }
- return sum;
- };
-
- for (unsigned int i = 0; i < b - 1; ++i) {
- produceSample(i);
- }
-
- iq->notifyEndOfRequestEmpty();
- {
- auto iter_view = iq->requestFilled();
- if (iter_view.isEmpty()) {
- throw std::invalid_argument("iter view is empty!");
- }
- nntrainer::Iteration &it = iter_view.get();
-
- auto &inputs = it.getInputsRef();
- auto &labels = it.getLabelsRef();
-
- for (auto &input : inputs) {
- sum_from_consumer += getSumOfPartiallyFilledTensor(input, it.batch());
- }
- for (auto &label : labels) {
- sum_from_consumer += getSumOfPartiallyFilledTensor(label, it.batch());
- }
- }
- EXPECT_FALSE(consumeIteration());
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-/**
- * When calling notifytEndOfRequestEmpty(), there are four possible states the
- * queue is in.
- *
- * notified there will be no filling while...
- * 1. the last buffer is already consumed and already marked as emptied
- * 2. the last buffer has moved to filled_q.
- * 3. the last buffer is being filled, and there are multiple buffers being
- * filled
- * 4. the last buffer is being served
- *
- */
-
-TEST_P(IterQueueScenarios, caseOneNotifyAfterConsumingIsFinished_p) {
-
- std::promise<void> consume_done;
- auto producer = std::async(std::launch::async, [this, &consume_done]() {
- for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
- produceSample(i);
- }
- consume_done.get_future().get();
- iq->notifyEndOfRequestEmpty();
- });
-
- auto consumer = std::async(std::launch::async, [this, &consume_done]() {
- for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
- consumeIteration();
- }
- consume_done.set_value();
- EXPECT_FALSE(consumeIteration());
- });
-
- producer.get();
- consumer.get();
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios, caseTwoNotifyAfterTheLastBufferHasMovedToFilledQ_p) {
- std::vector<std::future<void>> producer_result;
-
- unsigned int number_of_producing = iq->batch() * iq->slots();
- producer_result.reserve(number_of_producing);
- for (unsigned int i = 0; i < number_of_producing; ++i) {
- producer_result.push_back(produceSampleAfter(i, 50ms));
- }
-
- for (auto &fut : producer_result) {
- fut.get();
- }
- iq->notifyEndOfRequestEmpty();
-
- for (unsigned int i = 0; i < iq->slots(); ++i) {
- EXPECT_TRUE(consumeIteration());
- }
- EXPECT_FALSE(consumeIteration());
-
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios, caseThreeNotifyAfterTheLastBufferIsBeingFilled_p) {
- std::future<void> notify_result;
- {
- std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
- unsigned int number_of_producing = iq->batch() * iq->slots();
- for (unsigned int i = 0; i < number_of_producing; ++i) {
- scoped_views.push(iq->requestEmpty());
- if (scoped_views.back().isEmpty()) {
- throw std::runtime_error("sample was empty");
- }
- auto &sample = scoped_views.back().get();
- auto &inputs = sample.getInputsRef();
- auto &labels = sample.getLabelsRef();
- sample_getter(i, inputs, labels);
- sum_from_producer += getSum(inputs, labels);
- }
-
- notify_result =
- std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
- std::this_thread::sleep_for(50ms);
- /// delaying destroying scoped_views to simulate samples are in
- /// the state of being filled
- }
- notify_result.get();
-
- for (unsigned int i = 0; i < iq->slots(); ++i) {
- EXPECT_TRUE(consumeIteration());
- }
- EXPECT_FALSE(consumeIteration());
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-TEST_P(IterQueueScenarios, caseFourNotifyAfterTheLastBufferIsBeingServed_p) {
- std::future<void> notify_result;
- unsigned int number_of_producing = iq->batch() * iq->slots();
-
- std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
- for (unsigned int i = 0; i < number_of_producing; ++i) {
- produceSample(i);
- }
-
- for (unsigned int i = 0; i < iq->slots() - 1; ++i) {
- EXPECT_TRUE(consumeIteration());
- }
- {
- auto iter_view = iq->requestFilled();
- notify_result =
- std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
- if (iter_view.isEmpty()) {
- throw std::invalid_argument("iter_view is empty!");
- }
- auto &iter = iter_view.get();
- auto &inputs = iter.getInputsRef();
- auto &labels = iter.getLabelsRef();
- sum_from_consumer += getSum(inputs, labels);
- EXPECT_FALSE(consumeIteration());
-
- std::this_thread::sleep_for(50ms);
- /// delay here to delay destroying iter_view to simulate
- /// notifyEndOfRequestEmpty() is being called during destroying the last
- /// buffer
- }
-
- notify_result.get();
- EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
-}
-
-/** @todo add a test that simulates multiple workers, for now this is impossible
- * because we don't have thread safe worker */
-
-TEST_P(IterQueueScenarios, notifyEndTwice_n) {
- iq->notifyEndOfRequestEmpty();
- EXPECT_ANY_THROW(iq->notifyEndOfRequestEmpty());
-}
-
-TEST_P(IterQueueScenarios, notifyEndAndTryRequestEmpty_n) {
- iq->notifyEndOfRequestEmpty();
- EXPECT_ANY_THROW(iq->requestEmpty());
-}
-
-TEST_P(IterQueueScenarios, ScopedViewSampleHandlesThrowWhileFillingFails_n) {
- std::future<void> notify_result;
-
- std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
- std::shared_future<void> ready_future(ready_promise.get_future());
- auto request_fail = std::async(std::launch::async, [&, this] {
- t1_ready_promise.set_value();
- ready_future.wait();
- auto sample_view = iq->requestEmpty();
- throw std::invalid_argument("while filling, it failed");
- });
-
- auto try_consume = std::async(std::launch::async, [&, this] {
- t2_ready_promise.set_value();
- ready_future.wait();
- auto iter_view = iq->requestFilled();
- EXPECT_TRUE(iter_view.isEmpty());
- });
-
- t1_ready_promise.get_future().wait();
- t2_ready_promise.get_future().wait();
- ready_promise.set_value();
-
- EXPECT_THROW(request_fail.get(), std::invalid_argument);
- iq->notifyEndOfRequestEmpty();
- try_consume.wait();
-}
-
-TEST_P(IterQueueScenarios, ScopedViewIterationHandlesThrowWhileFillingFails_n) {
- std::future<void> notify_result;
-
- std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
- std::shared_future<void> ready_future(ready_promise.get_future());
- auto feed_data = std::async(std::launch::async, [&, this] {
- t1_ready_promise.set_value();
- ready_future.wait();
- for (unsigned i = 0; i < iq->batch(); ++i) {
- auto sample_view = iq->requestEmpty();
- }
- });
-
- auto consume_fail = std::async(std::launch::async, [&, this] {
- t2_ready_promise.set_value();
- ready_future.wait();
- auto iter_view = iq->requestFilled();
- throw std::invalid_argument("while using, it failed");
- });
-
- t1_ready_promise.get_future().wait();
- t2_ready_promise.get_future().wait();
- ready_promise.set_value();
-
- EXPECT_THROW(consume_fail.get(), std::invalid_argument);
- feed_data.wait();
- EXPECT_ANY_THROW(iq->requestEmpty());
- EXPECT_TRUE(iq->requestFilled().isEmpty());
-}
-
-IterQueueTestParamType multi_slot_multi_batch = {
- 4 /** queue size */,
- {{3, 2, 4, 5}, {3, 4, 5, 7}} /** input_dims*/,
- {{3, 1, 1, 8}, {3, 1, 1, 2}} /** label_dims */};
-
-IterQueueTestParamType single_slot_multi_batch = {
- 1 /** queue size */,
- {{3, 2, 4, 5}, {3, 4, 5, 7}} /** input_dims*/,
- {{3, 1, 1, 8}, {3, 1, 1, 2}} /** label_dims */};
-
-IterQueueTestParamType multi_slot_single_batch = {
- 3 /** queue size */,
- {{1, 2, 4, 5}, {1, 4, 5, 7}} /** input_dims*/,
- {{1, 1, 1, 8}, {1, 1, 1, 2}} /** label_dims */};
-
-IterQueueTestParamType single_slot_single_batch = {
- 1 /** queue size */,
- {{1, 2, 4, 5}, {1, 4, 5, 7}} /** input_dims*/,
- {{1, 1, 1, 8}, {1, 1, 1, 2}} /** label_dims */};
-
-INSTANTIATE_TEST_CASE_P(IterQueue, IterQueueScenarios,
- ::testing::Values(multi_slot_multi_batch,
- single_slot_multi_batch,
- multi_slot_single_batch,
- single_slot_single_batch));
-
-TEST(IterQueue, constructEmptySlots_n) {
- EXPECT_ANY_THROW(nntrainer::IterationQueue(0, {{1}}, {{1}}));
-}
-
-TEST(IterQueue, constructEmptyInput_n) {
- EXPECT_ANY_THROW(nntrainer::IterationQueue(1, {}, {{1}}));
-}
-
-TEST(IterQueue, constructNotConsistentBatchSizeBetweenInputs_n) {
- EXPECT_ANY_THROW(
- nntrainer::IterationQueue(1, {{3, 1, 1, 10}, {2, 1, 1, 10}}, {}));
-}
-
-TEST(IterQueue, constructNotConsistentBatchSizeInLabel_n) {
- EXPECT_ANY_THROW(nntrainer::IterationQueue(1, {{3, 1, 1, 10}, {3, 1, 1, 10}},
- {{2, 1, 1, 10}}));
-}
-
-TEST(IterQueue, constructNotConsistentBatchSizeInLabel2_n) {
- EXPECT_ANY_THROW(nntrainer::IterationQueue(1, {{3, 1, 1, 10}, {3, 1, 1, 10}},
- {{3, 1, 1, 10}, {2, 1, 1, 10}}));
-}
db.setProperty({"buffer_size=9", "min=1", "max=2", "num_samples=10"});
{ /// invalidate iq after epoch is finished
- auto future_iq = db.startFetchWorker_sample({{3, 1, 1, 2}}, {{3, 1, 1, 1}});
+ auto future_iq = db.startFetchWorker({{3, 1, 1, 2}}, {{3, 1, 1, 1}});
for (unsigned int i = 0; i < 3; ++i) {
- auto iteration_view = db.fetch_sample();
+ auto iteration_view = db.fetch();
EXPECT_FALSE(iteration_view.isEmpty());
auto &iter = iteration_view.get();
auto &inputs = iter.getInputsRef();
EXPECT_EQ(labels[0].getDim(), nntrainer::TensorDim({3, 1, 1, 1}));
}
{
- auto iteration_view = db.fetch_sample();
+ auto iteration_view = db.fetch();
EXPECT_FALSE(iteration_view.isEmpty());
EXPECT_EQ(iteration_view.get().batch(), 1u); // partial batch is allowed
}
{
- auto iteration_view = db.fetch_sample(); // no more iteration
+ auto iteration_view = db.fetch(); // no more iteration
EXPECT_TRUE(iteration_view.isEmpty());
}
nntrainer::DataBuffer db(std::move(prod));
db.setProperty({"buffer_size=2", "min=1", "max=2", "num_samples=2"});
- EXPECT_THROW(db.fetch_sample(), std::runtime_error);
+ EXPECT_THROW(db.fetch(), std::runtime_error);
}
TEST(DataBuffer, fetchAfterBqIsDeleted_n) {
nntrainer::DataBuffer db(std::move(prod));
db.setProperty({"buffer_size=4", "min=1", "max=2", "num_samples=3"});
- auto future_bq = db.startFetchWorker_sample({{4, 1, 1, 2}}, {{4, 1, 1, 1}});
+ auto future_bq = db.startFetchWorker({{4, 1, 1, 2}}, {{4, 1, 1, 1}});
future_bq.get();
- EXPECT_THROW(db.fetch_sample(), std::runtime_error);
+ EXPECT_THROW(db.fetch(), std::runtime_error);
}
INSTANTIATE_TEST_CASE_P(Func, DataProducerSemantics,
::testing::Values(func_success, func_error,
func_nullptr));
-
-INSTANTIATE_TEST_CASE_P(Func, DataProducerSemantics_samples,
- ::testing::Values(func_success, func_error,
- func_nullptr));
auto producer = std::async(std::launch::async, [this]() {
std::this_thread::sleep_for(50ms);
- for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+ for (unsigned int i = 0u; i < DATA_SIXE; ++i) {
produceSample(i);
}
});
return f;
}
-auto random_onehot_success = DataProducerSemanticsParamType(
- createDataProducer<nntrainer::RandomDataOneHotProducer>,
- {"min=0", "max=1", "num_samples=10"}, {{3, 2, 4, 5}}, {{3, 1, 1, 10}},
- random_onehot_validator(0, 1), DataProducerSemanticsExpectedResult::SUCCESS);
-
-auto random_onehot_min_over_max = DataProducerSemanticsParamType(
- createDataProducer<nntrainer::RandomDataOneHotProducer>,
- {"min=2", "max=1", "num_samples=10"}, {{3, 2, 4, 5}}, {{3, 1, 1, 10}},
- nullptr, DataProducerSemanticsExpectedResult::FAIL_AT_FINALIZE);
-
-auto random_onehot_invalid_label_shape = DataProducerSemanticsParamType(
- createDataProducer<nntrainer::RandomDataOneHotProducer>, {}, {{3, 2, 4, 5}},
- {{3, 1, 2, 10}}, nullptr,
- DataProducerSemanticsExpectedResult::FAIL_AT_FINALIZE);
-
-INSTANTIATE_TEST_CASE_P(RandomOneHot, DataProducerSemantics,
- ::testing::Values(random_onehot_success,
- random_onehot_min_over_max));
-
-auto random_onehot_success_sample_one_batch = DataProducerSemanticsParamType(
+auto random_onehot_success_one_batch = DataProducerSemanticsParamType(
createDataProducer<nntrainer::RandomDataOneHotProducer>,
{"min=0", "max=1", "num_samples=10"}, {{1, 2, 4, 5}}, {{1, 1, 1, 10}},
random_onehot_validator(0, 1), DataProducerSemanticsExpectedResult::SUCCESS);
random_onehot_validator(0, 1),
DataProducerSemanticsExpectedResult::SUCCESS);
-auto random_onehot_min_over_max_sample = DataProducerSemanticsParamType(
+auto random_onehot_min_over_max = DataProducerSemanticsParamType(
createDataProducer<nntrainer::RandomDataOneHotProducer>,
{"min=2", "max=1", "num_samples=10"}, {{1, 2, 4, 5}}, {{1, 1, 1, 10}},
nullptr, DataProducerSemanticsExpectedResult::FAIL_AT_FINALIZE);
-auto random_onehot_invalid_label_shape_sample = DataProducerSemanticsParamType(
+auto random_onehot_invalid_label_shape = DataProducerSemanticsParamType(
createDataProducer<nntrainer::RandomDataOneHotProducer>, {}, {{1, 2, 4, 5}},
{{1, 1, 2, 10}}, nullptr,
DataProducerSemanticsExpectedResult::FAIL_AT_FINALIZE);
INSTANTIATE_TEST_CASE_P(
- RandomOneHot, DataProducerSemantics_samples,
- ::testing::Values(random_onehot_success_sample_one_batch,
+ RandomOneHot, DataProducerSemantics,
+ ::testing::Values(random_onehot_success_one_batch,
random_onehot_success_multi_batch_will_be_ignored,
- random_onehot_min_over_max_sample,
- random_onehot_invalid_label_shape_sample));
+ random_onehot_min_over_max,
+ random_onehot_invalid_label_shape));
DataProducerSemanticsExpectedResult::FAIL_AT_FINALIZE);
INSTANTIATE_TEST_CASE_P(RawFile, DataProducerSemantics,
- ::testing::Values(training_set, valSet, testSet,
- batch_too_big));
-
-INSTANTIATE_TEST_CASE_P(RawFile, DataProducerSemantics_samples,
::testing::Values(training_set, valSet, testSet));