#include <tuple>
#include <vector>
+using namespace std::chrono_literals;
+
+/** THIS number can be divided with 3 and 4 is multiple of current multiple
+ * batch scenario setup, as a @todo this should be better part of a
+ * IterQueueTestParamType */
+static constexpr unsigned int DATA_SIZE = 384u;
+
nntrainer::DataProducer::Iteration data(size_t key) {
return {true, std::vector<nntrainer::Tensor>(key), {}};
};
iq = std::make_unique<nntrainer::IterationQueue>(q_size, input_dims,
label_dims);
auto producer = std::make_unique<nntrainer::RandomDataOneHotProducer>();
- producer->setProperty({"num_samples=10000"});
+ producer->setProperty({"num_samples=512"});
sample_getter = producer->finalize_sample(input_dims, label_dims);
+ this->input_dims = input_dims;
+ this->label_dims = label_dims;
sum_from_producer = 0;
sum_from_consumer = 0;
}
- virtual void produceSample(unsigned int size) {
+ virtual void
+ produceSample(unsigned int size,
+ const std::chrono::milliseconds *duration = nullptr) {
auto sample_view = iq->requestEmpty();
+ if (sample_view.isEmpty()) {
+ throw std::runtime_error("sample_view is empty!");
+ }
auto &sample = sample_view.get();
auto &inputs = sample.getInputsRef();
auto &labels = sample.getLabelsRef();
+ if (duration) {
+ std::this_thread::sleep_for(*duration);
+ }
+ std::lock_guard<std::mutex> lg(producer_mutex);
sample_getter(size, inputs, labels);
sum_from_producer += getSum(inputs, labels);
}
- virtual void consumeIteration() {
+ virtual std::future<void>
+ produceSampleAfter(unsigned int size,
+ const std::chrono::milliseconds &duration) {
+ return std::async(std::launch::async, [this, size, duration] {
+ produceSample(size, &duration);
+ });
+ }
+
+ virtual std::future<bool>
+ consumeIterationAfter(const std::chrono::milliseconds &duration) {
+ return std::async(std::launch::async, [this, duration] {
+ std::this_thread::sleep_for(duration);
+ return consumeIteration();
+ });
+ }
+
+ virtual bool consumeIteration() {
auto iter_view = iq->requestFilled();
+ if (iter_view.isEmpty()) {
+ return false;
+ }
+
auto &iter = iter_view.get();
auto &inputs = iter.getInputsRef();
auto &labels = iter.getLabelsRef();
+ std::lock_guard<std::mutex> lg(consumer_mutex);
sum_from_consumer += getSum(inputs, labels);
+ return true;
}
/**
return std::accumulate(labels.begin(), labels.end(), sum, accumulator);
}
- long double sum_from_producer;
- long double sum_from_consumer;
+ /** @todo currently there is an error between those two around 1e-8, this
+ * should be changed to integer based comparison */
+ long double
+ sum_from_producer; /**< sum of all the dataset from producer side*/
+ long double
+ sum_from_consumer; /**< sum of all the dataset from consumer side */
+
+ mutable std::mutex producer_mutex, consumer_mutex;
nntrainer::DataProducer::Generator_sample sample_getter;
std::unique_ptr<nntrainer::IterationQueue> iq;
std::vector<nntrainer::TensorDim> input_dims; /**< input dims */
TEST_P(IterQueueScenarios, produceAndConsumeSingle_p) {
auto batch_size = iq->batch();
-
for (unsigned int i = 0; i < batch_size; ++i) {
produceSample(i);
}
EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
}
+TEST_P(IterQueueScenarios,
+ produceAndConsumAsyncForDeterminedSizeConsumerRunningFirst_p) {
+
+ auto producer = std::async(std::launch::async, [this]() {
+ sleep(1);
+ for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+ produceSample(i);
+ }
+ });
+
+ auto consumer = std::async(std::launch::async, [this]() {
+ for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+ consumeIteration();
+ }
+ });
+
+ producer.get();
+ consumer.get();
+
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+ produceAndConsumAsyncForDeterminedSizeProducerRunningFirst_p) {
+
+ auto producer = std::async(std::launch::async, [this]() {
+ for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+ produceSample(i);
+ }
+ });
+
+ auto consumer = std::async(std::launch::async, [this]() {
+ sleep(1);
+ for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+ consumeIteration();
+ }
+ });
+
+ producer.get();
+ consumer.get();
+
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+ produceAndConsumAsyncForUnknownSizeProducerRunningFirst_p) {
+ auto producer = std::async(std::launch::async, [this]() {
+ for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+ produceSample(i);
+ }
+ iq->notifyEndOfRequestEmpty();
+ });
+
+ auto consumer = std::async(std::launch::async, [this]() {
+ sleep(1);
+ while (consumeIteration()) {
+ }
+ });
+
+ producer.get();
+ consumer.get();
+
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+ produceAndConsumAsyncForUnknownSizeConsumerRunningFirst_p) {
+ auto producer = std::async(std::launch::async, [this]() {
+ sleep(1);
+ for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+ produceSample(i);
+ }
+ iq->notifyEndOfRequestEmpty();
+ });
+
+ auto consumer = std::async(std::launch::async, [this]() {
+ while (consumeIteration()) {
+ }
+ });
+
+ producer.get();
+ consumer.get();
+
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, produceAndConsumPartiallyFilledBatch_p) {
+ auto b = iq->batch();
+ if (b == 1) {
+ return; /// if batch is one, there is no partially filled batch
+ }
+
+ auto getSumOfPartiallyFilledTensor =
+ [](const nntrainer::Tensor &t, unsigned int actual_batch) -> long double {
+ long double sum = 0;
+ nntrainer::Tensor result = t.sum_by_batch();
+ for (unsigned int i = 0; i < actual_batch; ++i) {
+ sum += result.getValue(i, 0, 0, 0);
+ }
+ return sum;
+ };
+
+ for (unsigned int i = 0; i < b - 1; ++i) {
+ produceSample(i);
+ }
+
+ iq->notifyEndOfRequestEmpty();
+ {
+ auto iter_view = iq->requestFilled();
+ if (iter_view.isEmpty()) {
+ throw std::invalid_argument("iter view is empty!");
+ }
+ nntrainer::Iteration &it = iter_view.get();
+
+ auto &inputs = it.getInputsRef();
+ auto &labels = it.getLabelsRef();
+
+ for (auto &input : inputs) {
+ sum_from_consumer += getSumOfPartiallyFilledTensor(input, it.batch());
+ }
+ for (auto &label : labels) {
+ sum_from_consumer += getSumOfPartiallyFilledTensor(label, it.batch());
+ }
+ }
+ EXPECT_FALSE(consumeIteration());
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+/**
+ * When calling notifytEndOfRequestEmpty(), there are four possible states the
+ * queue is in.
+ *
+ * notified there will be no filling while...
+ * 1. the last buffer is already consumed and already marked as emptied
+ * 2. the last buffer has moved to filled_q.
+ * 3. the last buffer is being filled, and there are multiple buffers being
+ * filled
+ * 4. the last buffer is being served
+ *
+ */
+
+TEST_P(IterQueueScenarios, caseOneNotifyAfterConsumingIsFinished_p) {
+
+ std::promise<void> consume_done;
+ auto producer = std::async(std::launch::async, [this, &consume_done]() {
+ for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+ produceSample(i);
+ }
+ consume_done.get_future().get();
+ iq->notifyEndOfRequestEmpty();
+ });
+
+ auto consumer = std::async(std::launch::async, [this, &consume_done]() {
+ for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+ consumeIteration();
+ }
+ consume_done.set_value();
+ EXPECT_FALSE(consumeIteration());
+ });
+
+ producer.get();
+ consumer.get();
+
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseTwoNotifyAfterTheLastBufferHasMovedToFilledQ_p) {
+ std::vector<std::future<void>> producer_result;
+
+ unsigned int number_of_producing = iq->batch() * iq->slots();
+ producer_result.reserve(number_of_producing);
+ for (unsigned int i = 0; i < number_of_producing; ++i) {
+ producer_result.push_back(produceSampleAfter(i, 50ms));
+ }
+
+ for (auto &fut : producer_result) {
+ fut.get();
+ }
+ iq->notifyEndOfRequestEmpty();
+
+ for (unsigned int i = 0; i < iq->slots(); ++i) {
+ EXPECT_TRUE(consumeIteration());
+ }
+ EXPECT_FALSE(consumeIteration());
+
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseThreeNotifyAfterTheLastBufferIsBeingFilled_p) {
+ std::future<void> notify_result;
+ {
+ std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
+ unsigned int number_of_producing = iq->batch() * iq->slots();
+ for (unsigned int i = 0; i < number_of_producing; ++i) {
+ scoped_views.push(iq->requestEmpty());
+ if (scoped_views.back().isEmpty()) {
+ throw std::runtime_error("sample was empty");
+ }
+ auto &sample = scoped_views.back().get();
+ auto &inputs = sample.getInputsRef();
+ auto &labels = sample.getLabelsRef();
+ sample_getter(i, inputs, labels);
+ sum_from_producer += getSum(inputs, labels);
+ }
+
+ notify_result =
+ std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
+ std::this_thread::sleep_for(500ms);
+ /// delaying destroying scoped_views to simulate samples are in
+ /// the state of being filled
+ }
+ notify_result.get();
+
+ for (unsigned int i = 0; i < iq->slots(); ++i) {
+ EXPECT_TRUE(consumeIteration());
+ }
+ EXPECT_FALSE(consumeIteration());
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseFourNotifyAfterTheLastBufferIsBeingServed_p) {
+ std::future<void> notify_result;
+ unsigned int number_of_producing = iq->batch() * iq->slots();
+
+ std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
+ for (unsigned int i = 0; i < number_of_producing; ++i) {
+ produceSample(i);
+ }
+
+ for (unsigned int i = 0; i < iq->slots() - 1; ++i) {
+ EXPECT_TRUE(consumeIteration());
+ }
+ {
+ auto iter_view = iq->requestFilled();
+ notify_result =
+ std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
+ if (iter_view.isEmpty()) {
+ throw std::invalid_argument("iter_view is empty!");
+ }
+ auto &iter = iter_view.get();
+ auto &inputs = iter.getInputsRef();
+ auto &labels = iter.getLabelsRef();
+ sum_from_consumer += getSum(inputs, labels);
+ EXPECT_FALSE(consumeIteration());
+
+ std::this_thread::sleep_for(500ms);
+ /// delay here to delay destroying iter_view to simulate
+ /// notifyEndOfRequestEmpty() is being called during destroying the last
+ /// buffer
+ }
+
+ notify_result.get();
+ EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+/** @todo add a test that simulates multiple workers, for now this is impossible
+ * because we don't have thread safe worker */
+
+TEST_P(IterQueueScenarios, notifyEndTwice_n) {
+ iq->notifyEndOfRequestEmpty();
+ EXPECT_ANY_THROW(iq->notifyEndOfRequestEmpty());
+}
+
+TEST_P(IterQueueScenarios, notifyEndAndTryRequestEmpty_n) {
+ iq->notifyEndOfRequestEmpty();
+ EXPECT_ANY_THROW(iq->requestEmpty());
+}
+
+TEST_P(IterQueueScenarios,
+ DISABLED_ScopedViewSampleHandlesThrowWhileFillingFails_n) {
+ /// NYI
+}
+
+TEST_P(IterQueueScenarios,
+ DISABLED_ScopedViewIterationHandlesThrowWhileFillingFails_n) {
+ /// NYI
+}
+
IterQueueTestParamType multi_slot_multi_batch = {
4 /** queue size */,
{{3, 2, 4, 5}, {3, 4, 5, 7}} /** input_dims*/,