From: Jihoon Lee Date: Wed, 18 Aug 2021 05:21:23 +0000 (+0900) Subject: [itq] Add multiple mt itq test X-Git-Tag: accepted/tizen/unified/20210829.234903~8 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=f2d9de5b34e1af854193769a847862f504293554;p=platform%2Fcore%2Fml%2Fnntrainer.git [itq] Add multiple mt itq test This patch adds multiple test based on scenarios **Self evaluation:** 1. Build test: [X]Passed [ ]Failed [ ]Skipped 2. Run test: [X]Passed [ ]Failed [ ]Skipped Signed-off-by: Jihoon Lee --- diff --git a/nntrainer/tensor/tensor.cpp b/nntrainer/tensor/tensor.cpp index e2bfb1f..31c2916 100644 --- a/nntrainer/tensor/tensor.cpp +++ b/nntrainer/tensor/tensor.cpp @@ -644,7 +644,7 @@ void Tensor::apply_broadcast_util( * This is to sum the Tensor data according to the dim.batch(). * Therefore the result has M(dim.batch(), 1, 1, 1) dimension. */ -Tensor Tensor::sum_by_batch() { +Tensor Tensor::sum_by_batch() const { Tensor ret(dim.batch(), 1, 1, 1); unsigned int feat_len = dim.getFeatureLen(); unsigned int batch = dim.batch(); diff --git a/nntrainer/tensor/tensor.h b/nntrainer/tensor/tensor.h index 3a12558..b42fa16 100644 --- a/nntrainer/tensor/tensor.h +++ b/nntrainer/tensor/tensor.h @@ -547,7 +547,7 @@ public: * @brief sum all the Tensor elements according to the batch * @retval Calculated Tensor(batch, 1, 1, 1) */ - Tensor sum_by_batch(); + Tensor sum_by_batch() const; /** * @brief sum all the Tensor elements according to the axis diff --git a/test/unittest/datasets/unittest_batch_queue.cpp b/test/unittest/datasets/unittest_batch_queue.cpp index 0c76d70..9e1d7ee 100644 --- a/test/unittest/datasets/unittest_batch_queue.cpp +++ b/test/unittest/datasets/unittest_batch_queue.cpp @@ -22,6 +22,13 @@ #include #include +using namespace std::chrono_literals; + +/** THIS number can be divided with 3 and 4 is multiple of current multiple + * batch scenario setup, as a @todo this should be better part of a + * IterQueueTestParamType */ +static constexpr unsigned int DATA_SIZE = 384u; + nntrainer::DataProducer::Iteration data(size_t key) { return {true, std::vector(key), {}}; }; @@ -133,27 +140,60 @@ public: iq = std::make_unique(q_size, input_dims, label_dims); auto producer = std::make_unique(); - producer->setProperty({"num_samples=10000"}); + producer->setProperty({"num_samples=512"}); sample_getter = producer->finalize_sample(input_dims, label_dims); + this->input_dims = input_dims; + this->label_dims = label_dims; sum_from_producer = 0; sum_from_consumer = 0; } - virtual void produceSample(unsigned int size) { + virtual void + produceSample(unsigned int size, + const std::chrono::milliseconds *duration = nullptr) { auto sample_view = iq->requestEmpty(); + if (sample_view.isEmpty()) { + throw std::runtime_error("sample_view is empty!"); + } auto &sample = sample_view.get(); auto &inputs = sample.getInputsRef(); auto &labels = sample.getLabelsRef(); + if (duration) { + std::this_thread::sleep_for(*duration); + } + std::lock_guard lg(producer_mutex); sample_getter(size, inputs, labels); sum_from_producer += getSum(inputs, labels); } - virtual void consumeIteration() { + virtual std::future + produceSampleAfter(unsigned int size, + const std::chrono::milliseconds &duration) { + return std::async(std::launch::async, [this, size, duration] { + produceSample(size, &duration); + }); + } + + virtual std::future + consumeIterationAfter(const std::chrono::milliseconds &duration) { + return std::async(std::launch::async, [this, duration] { + std::this_thread::sleep_for(duration); + return consumeIteration(); + }); + } + + virtual bool consumeIteration() { auto iter_view = iq->requestFilled(); + if (iter_view.isEmpty()) { + return false; + } + auto &iter = iter_view.get(); auto &inputs = iter.getInputsRef(); auto &labels = iter.getLabelsRef(); + std::lock_guard lg(consumer_mutex); sum_from_consumer += getSum(inputs, labels); + return true; } /** @@ -182,8 +222,14 @@ protected: return std::accumulate(labels.begin(), labels.end(), sum, accumulator); } - long double sum_from_producer; - long double sum_from_consumer; + /** @todo currently there is an error between those two around 1e-8, this + * should be changed to integer based comparison */ + long double + sum_from_producer; /**< sum of all the dataset from producer side*/ + long double + sum_from_consumer; /**< sum of all the dataset from consumer side */ + + mutable std::mutex producer_mutex, consumer_mutex; nntrainer::DataProducer::Generator_sample sample_getter; std::unique_ptr iq; std::vector input_dims; /**< input dims */ @@ -192,7 +238,6 @@ protected: TEST_P(IterQueueScenarios, produceAndConsumeSingle_p) { auto batch_size = iq->batch(); - for (unsigned int i = 0; i < batch_size; ++i) { produceSample(i); } @@ -269,6 +314,284 @@ TEST_P(IterQueueScenarios, produceAndConsumeSyncMixed_p) { EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); } +TEST_P(IterQueueScenarios, + produceAndConsumAsyncForDeterminedSizeConsumerRunningFirst_p) { + + auto producer = std::async(std::launch::async, [this]() { + sleep(1); + for (unsigned int i = 0u; i < DATA_SIZE; ++i) { + produceSample(i); + } + }); + + auto consumer = std::async(std::launch::async, [this]() { + for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) { + consumeIteration(); + } + }); + + producer.get(); + consumer.get(); + + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +TEST_P(IterQueueScenarios, + produceAndConsumAsyncForDeterminedSizeProducerRunningFirst_p) { + + auto producer = std::async(std::launch::async, [this]() { + for (unsigned int i = 0u; i < DATA_SIZE; ++i) { + produceSample(i); + } + }); + + auto consumer = std::async(std::launch::async, [this]() { + sleep(1); + for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) { + consumeIteration(); + } + }); + + producer.get(); + consumer.get(); + + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +TEST_P(IterQueueScenarios, + produceAndConsumAsyncForUnknownSizeProducerRunningFirst_p) { + auto producer = std::async(std::launch::async, [this]() { + for (unsigned int i = 0u; i < DATA_SIZE; ++i) { + produceSample(i); + } + iq->notifyEndOfRequestEmpty(); + }); + + auto consumer = std::async(std::launch::async, [this]() { + sleep(1); + while (consumeIteration()) { + } + }); + + producer.get(); + consumer.get(); + + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +TEST_P(IterQueueScenarios, + produceAndConsumAsyncForUnknownSizeConsumerRunningFirst_p) { + auto producer = std::async(std::launch::async, [this]() { + sleep(1); + for (unsigned int i = 0u; i < DATA_SIZE; ++i) { + produceSample(i); + } + iq->notifyEndOfRequestEmpty(); + }); + + auto consumer = std::async(std::launch::async, [this]() { + while (consumeIteration()) { + } + }); + + producer.get(); + consumer.get(); + + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +TEST_P(IterQueueScenarios, produceAndConsumPartiallyFilledBatch_p) { + auto b = iq->batch(); + if (b == 1) { + return; /// if batch is one, there is no partially filled batch + } + + auto getSumOfPartiallyFilledTensor = + [](const nntrainer::Tensor &t, unsigned int actual_batch) -> long double { + long double sum = 0; + nntrainer::Tensor result = t.sum_by_batch(); + for (unsigned int i = 0; i < actual_batch; ++i) { + sum += result.getValue(i, 0, 0, 0); + } + return sum; + }; + + for (unsigned int i = 0; i < b - 1; ++i) { + produceSample(i); + } + + iq->notifyEndOfRequestEmpty(); + { + auto iter_view = iq->requestFilled(); + if (iter_view.isEmpty()) { + throw std::invalid_argument("iter view is empty!"); + } + nntrainer::Iteration &it = iter_view.get(); + + auto &inputs = it.getInputsRef(); + auto &labels = it.getLabelsRef(); + + for (auto &input : inputs) { + sum_from_consumer += getSumOfPartiallyFilledTensor(input, it.batch()); + } + for (auto &label : labels) { + sum_from_consumer += getSumOfPartiallyFilledTensor(label, it.batch()); + } + } + EXPECT_FALSE(consumeIteration()); + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +/** + * When calling notifytEndOfRequestEmpty(), there are four possible states the + * queue is in. + * + * notified there will be no filling while... + * 1. the last buffer is already consumed and already marked as emptied + * 2. the last buffer has moved to filled_q. + * 3. the last buffer is being filled, and there are multiple buffers being + * filled + * 4. the last buffer is being served + * + */ + +TEST_P(IterQueueScenarios, caseOneNotifyAfterConsumingIsFinished_p) { + + std::promise consume_done; + auto producer = std::async(std::launch::async, [this, &consume_done]() { + for (unsigned int i = 0u; i < DATA_SIZE; ++i) { + produceSample(i); + } + consume_done.get_future().get(); + iq->notifyEndOfRequestEmpty(); + }); + + auto consumer = std::async(std::launch::async, [this, &consume_done]() { + for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) { + consumeIteration(); + } + consume_done.set_value(); + EXPECT_FALSE(consumeIteration()); + }); + + producer.get(); + consumer.get(); + + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +TEST_P(IterQueueScenarios, caseTwoNotifyAfterTheLastBufferHasMovedToFilledQ_p) { + std::vector> producer_result; + + unsigned int number_of_producing = iq->batch() * iq->slots(); + producer_result.reserve(number_of_producing); + for (unsigned int i = 0; i < number_of_producing; ++i) { + producer_result.push_back(produceSampleAfter(i, 50ms)); + } + + for (auto &fut : producer_result) { + fut.get(); + } + iq->notifyEndOfRequestEmpty(); + + for (unsigned int i = 0; i < iq->slots(); ++i) { + EXPECT_TRUE(consumeIteration()); + } + EXPECT_FALSE(consumeIteration()); + + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +TEST_P(IterQueueScenarios, caseThreeNotifyAfterTheLastBufferIsBeingFilled_p) { + std::future notify_result; + { + std::queue> scoped_views; + unsigned int number_of_producing = iq->batch() * iq->slots(); + for (unsigned int i = 0; i < number_of_producing; ++i) { + scoped_views.push(iq->requestEmpty()); + if (scoped_views.back().isEmpty()) { + throw std::runtime_error("sample was empty"); + } + auto &sample = scoped_views.back().get(); + auto &inputs = sample.getInputsRef(); + auto &labels = sample.getLabelsRef(); + sample_getter(i, inputs, labels); + sum_from_producer += getSum(inputs, labels); + } + + notify_result = + std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); }); + std::this_thread::sleep_for(500ms); + /// delaying destroying scoped_views to simulate samples are in + /// the state of being filled + } + notify_result.get(); + + for (unsigned int i = 0; i < iq->slots(); ++i) { + EXPECT_TRUE(consumeIteration()); + } + EXPECT_FALSE(consumeIteration()); + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +TEST_P(IterQueueScenarios, caseFourNotifyAfterTheLastBufferIsBeingServed_p) { + std::future notify_result; + unsigned int number_of_producing = iq->batch() * iq->slots(); + + std::queue> scoped_views; + for (unsigned int i = 0; i < number_of_producing; ++i) { + produceSample(i); + } + + for (unsigned int i = 0; i < iq->slots() - 1; ++i) { + EXPECT_TRUE(consumeIteration()); + } + { + auto iter_view = iq->requestFilled(); + notify_result = + std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); }); + if (iter_view.isEmpty()) { + throw std::invalid_argument("iter_view is empty!"); + } + auto &iter = iter_view.get(); + auto &inputs = iter.getInputsRef(); + auto &labels = iter.getLabelsRef(); + sum_from_consumer += getSum(inputs, labels); + EXPECT_FALSE(consumeIteration()); + + std::this_thread::sleep_for(500ms); + /// delay here to delay destroying iter_view to simulate + /// notifyEndOfRequestEmpty() is being called during destroying the last + /// buffer + } + + notify_result.get(); + EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer); +} + +/** @todo add a test that simulates multiple workers, for now this is impossible + * because we don't have thread safe worker */ + +TEST_P(IterQueueScenarios, notifyEndTwice_n) { + iq->notifyEndOfRequestEmpty(); + EXPECT_ANY_THROW(iq->notifyEndOfRequestEmpty()); +} + +TEST_P(IterQueueScenarios, notifyEndAndTryRequestEmpty_n) { + iq->notifyEndOfRequestEmpty(); + EXPECT_ANY_THROW(iq->requestEmpty()); +} + +TEST_P(IterQueueScenarios, + DISABLED_ScopedViewSampleHandlesThrowWhileFillingFails_n) { + /// NYI +} + +TEST_P(IterQueueScenarios, + DISABLED_ScopedViewIterationHandlesThrowWhileFillingFails_n) { + /// NYI +} + IterQueueTestParamType multi_slot_multi_batch = { 4 /** queue size */, {{3, 2, 4, 5}, {3, 4, 5, 7}} /** input_dims*/,