[itq] Add multiple mt itq test
authorJihoon Lee <jhoon.it.lee@samsung.com>
Wed, 18 Aug 2021 05:21:23 +0000 (14:21 +0900)
committerJijoong Moon <jijoong.moon@samsung.com>
Fri, 27 Aug 2021 11:44:58 +0000 (20:44 +0900)
This patch adds multiple test based on scenarios

**Self evaluation:**
1. Build test: [X]Passed [ ]Failed [ ]Skipped
2. Run test: [X]Passed [ ]Failed [ ]Skipped

Signed-off-by: Jihoon Lee <jhoon.it.lee@samsung.com>
nntrainer/tensor/tensor.cpp
nntrainer/tensor/tensor.h
test/unittest/datasets/unittest_batch_queue.cpp

index e2bfb1f..31c2916 100644 (file)
@@ -644,7 +644,7 @@ void Tensor::apply_broadcast_util(
  * This is to sum the Tensor data according to the dim.batch().
  * Therefore the result has M(dim.batch(), 1, 1, 1) dimension.
  */
-Tensor Tensor::sum_by_batch() {
+Tensor Tensor::sum_by_batch() const {
   Tensor ret(dim.batch(), 1, 1, 1);
   unsigned int feat_len = dim.getFeatureLen();
   unsigned int batch = dim.batch();
index 3a12558..b42fa16 100644 (file)
@@ -547,7 +547,7 @@ public:
    * @brief     sum all the Tensor elements according to the batch
    * @retval    Calculated Tensor(batch, 1, 1, 1)
    */
-  Tensor sum_by_batch();
+  Tensor sum_by_batch() const;
 
   /**
    * @brief     sum all the Tensor elements according to the axis
index 0c76d70..9e1d7ee 100644 (file)
 #include <tuple>
 #include <vector>
 
+using namespace std::chrono_literals;
+
+/** THIS number can be divided with 3 and 4 is multiple of current multiple
+ * batch scenario setup, as a @todo this should be better part of a
+ * IterQueueTestParamType */
+static constexpr unsigned int DATA_SIZE = 384u;
+
 nntrainer::DataProducer::Iteration data(size_t key) {
   return {true, std::vector<nntrainer::Tensor>(key), {}};
 };
@@ -133,27 +140,60 @@ public:
     iq = std::make_unique<nntrainer::IterationQueue>(q_size, input_dims,
                                                      label_dims);
     auto producer = std::make_unique<nntrainer::RandomDataOneHotProducer>();
-    producer->setProperty({"num_samples=10000"});
+    producer->setProperty({"num_samples=512"});
     sample_getter = producer->finalize_sample(input_dims, label_dims);
+    this->input_dims = input_dims;
+    this->label_dims = label_dims;
     sum_from_producer = 0;
     sum_from_consumer = 0;
   }
 
-  virtual void produceSample(unsigned int size) {
+  virtual void
+  produceSample(unsigned int size,
+                const std::chrono::milliseconds *duration = nullptr) {
     auto sample_view = iq->requestEmpty();
+    if (sample_view.isEmpty()) {
+      throw std::runtime_error("sample_view is empty!");
+    }
     auto &sample = sample_view.get();
     auto &inputs = sample.getInputsRef();
     auto &labels = sample.getLabelsRef();
+    if (duration) {
+      std::this_thread::sleep_for(*duration);
+    }
+    std::lock_guard<std::mutex> lg(producer_mutex);
     sample_getter(size, inputs, labels);
     sum_from_producer += getSum(inputs, labels);
   }
 
-  virtual void consumeIteration() {
+  virtual std::future<void>
+  produceSampleAfter(unsigned int size,
+                     const std::chrono::milliseconds &duration) {
+    return std::async(std::launch::async, [this, size, duration] {
+      produceSample(size, &duration);
+    });
+  }
+
+  virtual std::future<bool>
+  consumeIterationAfter(const std::chrono::milliseconds &duration) {
+    return std::async(std::launch::async, [this, duration] {
+      std::this_thread::sleep_for(duration);
+      return consumeIteration();
+    });
+  }
+
+  virtual bool consumeIteration() {
     auto iter_view = iq->requestFilled();
+    if (iter_view.isEmpty()) {
+      return false;
+    }
+
     auto &iter = iter_view.get();
     auto &inputs = iter.getInputsRef();
     auto &labels = iter.getLabelsRef();
+    std::lock_guard<std::mutex> lg(consumer_mutex);
     sum_from_consumer += getSum(inputs, labels);
+    return true;
   }
 
   /**
@@ -182,8 +222,14 @@ protected:
     return std::accumulate(labels.begin(), labels.end(), sum, accumulator);
   }
 
-  long double sum_from_producer;
-  long double sum_from_consumer;
+  /** @todo currently there is an error between those two around 1e-8, this
+   * should be changed to integer based comparison */
+  long double
+    sum_from_producer; /**< sum of all the dataset from producer side*/
+  long double
+    sum_from_consumer; /**< sum of all the dataset from consumer side */
+
+  mutable std::mutex producer_mutex, consumer_mutex;
   nntrainer::DataProducer::Generator_sample sample_getter;
   std::unique_ptr<nntrainer::IterationQueue> iq;
   std::vector<nntrainer::TensorDim> input_dims; /**< input dims */
@@ -192,7 +238,6 @@ protected:
 
 TEST_P(IterQueueScenarios, produceAndConsumeSingle_p) {
   auto batch_size = iq->batch();
-
   for (unsigned int i = 0; i < batch_size; ++i) {
     produceSample(i);
   }
@@ -269,6 +314,284 @@ TEST_P(IterQueueScenarios, produceAndConsumeSyncMixed_p) {
   EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
 }
 
+TEST_P(IterQueueScenarios,
+       produceAndConsumAsyncForDeterminedSizeConsumerRunningFirst_p) {
+
+  auto producer = std::async(std::launch::async, [this]() {
+    sleep(1);
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+  });
+
+  auto consumer = std::async(std::launch::async, [this]() {
+    for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+      consumeIteration();
+    }
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+       produceAndConsumAsyncForDeterminedSizeProducerRunningFirst_p) {
+
+  auto producer = std::async(std::launch::async, [this]() {
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+  });
+
+  auto consumer = std::async(std::launch::async, [this]() {
+    sleep(1);
+    for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+      consumeIteration();
+    }
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+       produceAndConsumAsyncForUnknownSizeProducerRunningFirst_p) {
+  auto producer = std::async(std::launch::async, [this]() {
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+    iq->notifyEndOfRequestEmpty();
+  });
+
+  auto consumer = std::async(std::launch::async, [this]() {
+    sleep(1);
+    while (consumeIteration()) {
+    }
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+       produceAndConsumAsyncForUnknownSizeConsumerRunningFirst_p) {
+  auto producer = std::async(std::launch::async, [this]() {
+    sleep(1);
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+    iq->notifyEndOfRequestEmpty();
+  });
+
+  auto consumer = std::async(std::launch::async, [this]() {
+    while (consumeIteration()) {
+    }
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, produceAndConsumPartiallyFilledBatch_p) {
+  auto b = iq->batch();
+  if (b == 1) {
+    return; /// if batch is one, there is no partially filled batch
+  }
+
+  auto getSumOfPartiallyFilledTensor =
+    [](const nntrainer::Tensor &t, unsigned int actual_batch) -> long double {
+    long double sum = 0;
+    nntrainer::Tensor result = t.sum_by_batch();
+    for (unsigned int i = 0; i < actual_batch; ++i) {
+      sum += result.getValue(i, 0, 0, 0);
+    }
+    return sum;
+  };
+
+  for (unsigned int i = 0; i < b - 1; ++i) {
+    produceSample(i);
+  }
+
+  iq->notifyEndOfRequestEmpty();
+  {
+    auto iter_view = iq->requestFilled();
+    if (iter_view.isEmpty()) {
+      throw std::invalid_argument("iter view is empty!");
+    }
+    nntrainer::Iteration &it = iter_view.get();
+
+    auto &inputs = it.getInputsRef();
+    auto &labels = it.getLabelsRef();
+
+    for (auto &input : inputs) {
+      sum_from_consumer += getSumOfPartiallyFilledTensor(input, it.batch());
+    }
+    for (auto &label : labels) {
+      sum_from_consumer += getSumOfPartiallyFilledTensor(label, it.batch());
+    }
+  }
+  EXPECT_FALSE(consumeIteration());
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+/**
+ * When calling notifytEndOfRequestEmpty(), there are four possible states the
+ * queue is in.
+ *
+ * notified there will be no filling while...
+ * 1. the last buffer is already consumed and already marked as emptied
+ * 2. the last buffer has moved to filled_q.
+ * 3. the last buffer is being filled, and there are multiple buffers being
+ * filled
+ * 4. the last buffer is being served
+ *
+ */
+
+TEST_P(IterQueueScenarios, caseOneNotifyAfterConsumingIsFinished_p) {
+
+  std::promise<void> consume_done;
+  auto producer = std::async(std::launch::async, [this, &consume_done]() {
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+    consume_done.get_future().get();
+    iq->notifyEndOfRequestEmpty();
+  });
+
+  auto consumer = std::async(std::launch::async, [this, &consume_done]() {
+    for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+      consumeIteration();
+    }
+    consume_done.set_value();
+    EXPECT_FALSE(consumeIteration());
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseTwoNotifyAfterTheLastBufferHasMovedToFilledQ_p) {
+  std::vector<std::future<void>> producer_result;
+
+  unsigned int number_of_producing = iq->batch() * iq->slots();
+  producer_result.reserve(number_of_producing);
+  for (unsigned int i = 0; i < number_of_producing; ++i) {
+    producer_result.push_back(produceSampleAfter(i, 50ms));
+  }
+
+  for (auto &fut : producer_result) {
+    fut.get();
+  }
+  iq->notifyEndOfRequestEmpty();
+
+  for (unsigned int i = 0; i < iq->slots(); ++i) {
+    EXPECT_TRUE(consumeIteration());
+  }
+  EXPECT_FALSE(consumeIteration());
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseThreeNotifyAfterTheLastBufferIsBeingFilled_p) {
+  std::future<void> notify_result;
+  {
+    std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
+    unsigned int number_of_producing = iq->batch() * iq->slots();
+    for (unsigned int i = 0; i < number_of_producing; ++i) {
+      scoped_views.push(iq->requestEmpty());
+      if (scoped_views.back().isEmpty()) {
+        throw std::runtime_error("sample was empty");
+      }
+      auto &sample = scoped_views.back().get();
+      auto &inputs = sample.getInputsRef();
+      auto &labels = sample.getLabelsRef();
+      sample_getter(i, inputs, labels);
+      sum_from_producer += getSum(inputs, labels);
+    }
+
+    notify_result =
+      std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
+    std::this_thread::sleep_for(500ms);
+    /// delaying destroying scoped_views to simulate samples are in
+    /// the state of being filled
+  }
+  notify_result.get();
+
+  for (unsigned int i = 0; i < iq->slots(); ++i) {
+    EXPECT_TRUE(consumeIteration());
+  }
+  EXPECT_FALSE(consumeIteration());
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseFourNotifyAfterTheLastBufferIsBeingServed_p) {
+  std::future<void> notify_result;
+  unsigned int number_of_producing = iq->batch() * iq->slots();
+
+  std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
+  for (unsigned int i = 0; i < number_of_producing; ++i) {
+    produceSample(i);
+  }
+
+  for (unsigned int i = 0; i < iq->slots() - 1; ++i) {
+    EXPECT_TRUE(consumeIteration());
+  }
+  {
+    auto iter_view = iq->requestFilled();
+    notify_result =
+      std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
+    if (iter_view.isEmpty()) {
+      throw std::invalid_argument("iter_view is empty!");
+    }
+    auto &iter = iter_view.get();
+    auto &inputs = iter.getInputsRef();
+    auto &labels = iter.getLabelsRef();
+    sum_from_consumer += getSum(inputs, labels);
+    EXPECT_FALSE(consumeIteration());
+
+    std::this_thread::sleep_for(500ms);
+    /// delay here to delay destroying iter_view to simulate
+    /// notifyEndOfRequestEmpty() is being called during destroying the last
+    /// buffer
+  }
+
+  notify_result.get();
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+/** @todo add a test that simulates multiple workers, for now this is impossible
+ * because we don't have thread safe worker */
+
+TEST_P(IterQueueScenarios, notifyEndTwice_n) {
+  iq->notifyEndOfRequestEmpty();
+  EXPECT_ANY_THROW(iq->notifyEndOfRequestEmpty());
+}
+
+TEST_P(IterQueueScenarios, notifyEndAndTryRequestEmpty_n) {
+  iq->notifyEndOfRequestEmpty();
+  EXPECT_ANY_THROW(iq->requestEmpty());
+}
+
+TEST_P(IterQueueScenarios,
+       DISABLED_ScopedViewSampleHandlesThrowWhileFillingFails_n) {
+  /// NYI
+}
+
+TEST_P(IterQueueScenarios,
+       DISABLED_ScopedViewIterationHandlesThrowWhileFillingFails_n) {
+  /// NYI
+}
+
 IterQueueTestParamType multi_slot_multi_batch = {
   4 /** queue size */,
   {{3, 2, 4, 5}, {3, 4, 5, 7}} /** input_dims*/,