[Dataset] Handle queue edge cases
authorJihoon Lee <jhoon.it.lee@samsung.com>
Wed, 18 Aug 2021 12:50:56 +0000 (21:50 +0900)
committerJijoong Moon <jijoong.moon@samsung.com>
Fri, 27 Aug 2021 11:44:58 +0000 (20:44 +0900)
Handle queue edge cases where ScopedView runs into an exception

**Self evaluation:**
1. Build test: [X]Passed [ ]Failed [ ]Skipped
2. Run test: [X]Passed [ ]Failed [ ]Skipped

Signed-off-by: Jihoon Lee <jhoon.it.lee@samsung.com>
nntrainer/dataset/batch_queue.cpp
nntrainer/dataset/batch_queue.h
nntrainer/dataset/data_iteration.cpp
nntrainer/dataset/data_iteration.h
test/unittest/datasets/unittest_batch_queue.cpp
test/unittest/datasets/unittest_iteration_queue.cpp [new file with mode: 0644]

index 17733552cee892295c1f45e1a3a62f6a13840025..8deb361d9c5e4c8990ead10915f9123c206ef325 100644 (file)
@@ -84,6 +84,7 @@ IterationQueue::IterationQueue(
     iterations.emplace_back(input_dims, label_dims, this);
     empty_q.push(&iterations.back());
   }
+  batch_size = iterations.front().get().batch();
 }
 
 IterationQueue::~IterationQueue() {
@@ -101,12 +102,15 @@ IterationQueue::~IterationQueue() {
 
 ScopedView<Sample> IterationQueue::requestEmpty() {
   std::scoped_lock lg(empty_mutex);
-  NNTR_THROW_IF(flow_state != FlowState::FLOW_STATE_OPEN, std::invalid_argument)
-    << "Calling requestEmpty() after notifyEndOfRequestEmpty() breaks "
-       "invariant";
+  auto current_flow_state = flow_state.load();
+  NNTR_THROW_IF(current_flow_state != FlowState::FLOW_STATE_OPEN,
+                std::invalid_argument)
+    << "the queue expect state of "
+    << static_cast<unsigned>(FlowState::FLOW_STATE_OPEN) << " but met "
+    << static_cast<unsigned>(current_flow_state);
 
   /// below is useful information when debugging iteration queue, but there will
-  /// be to much log if we turn the log on. so leaving it as a comment for now.
+  /// be too much log if we turn the log on. so leaving it as a comment for now.
   // std::cout << "[requestEmpty] empty_q.size(): " << empty_q.size()
   // << " being_filled: " << num_being_filled
   // << " filled_q.size():  " << filled_q.size() << '\n';
@@ -114,16 +118,24 @@ ScopedView<Sample> IterationQueue::requestEmpty() {
   if (being_filled == nullptr ||
       current_iterator + 1 == being_filled->get().end()) {
     being_filled = empty_q.waitAndPop();
+    being_filled->reset();
     num_being_filled++;
     current_iterator = being_filled->get().begin();
   } else {
     current_iterator++;
   }
 
-  auto view = ScopedView<Sample>(&(*current_iterator),
-                                 [current_being_filed = this->being_filled] {
-                                   current_being_filed->markSampleFilled();
-                                 });
+  auto view =
+    ScopedView<Sample>(&(*current_iterator),
+                       [current_being_filed = this->being_filled] {
+                         current_being_filed->markSampleFilled();
+                       },
+                       [this, current_being_filled = this->being_filled] {
+                         std::unique_lock lg(empty_mutex);
+                         this->markEmpty(current_being_filled);
+                         num_being_filled--;
+                         notify_emptied_cv.notify_all();
+                       });
   return view;
 }
 
@@ -131,44 +143,52 @@ ScopedView<Iteration> IterationQueue::requestFilled() {
   std::scoped_lock lock(filled_mutex);
 
   /// below is useful information when debugging iteration queue, but there will
-  /// be to much log if we turn the log on. so leaving it as a comment for now.
+  /// be too much log if we turn the log on. so leaving it as a comment for now.
   // std::cout << "[requestFilled] empty_q.size(): " << empty_q.size()
   // << " num being filled: " << num_being_filled
   // << " filled_q.size(): " << filled_q.size() << '\n';
-  if (flow_state == FlowState::FLOW_STATE_STOPPED) {
+  if (flow_state.load() == FlowState::FLOW_STATE_STOPPED) {
     return ScopedView<Iteration>(nullptr);
   }
 
   auto iteration = filled_q.waitAndPop();
   if (iteration == nullptr) {
-    NNTR_THROW_IF(flow_state != FlowState::FLOW_STATE_STOP_REQUESTED,
-                  std::runtime_error)
+    auto stop_request_state = FlowState::FLOW_STATE_STOP_REQUESTED;
+    bool exchange_result = flow_state.compare_exchange_strong(
+      stop_request_state, FlowState::FLOW_STATE_STOPPED);
+    NNTR_THROW_IF(!exchange_result, std::runtime_error)
       << "the queue has either already stopped or running, but trying stopping "
          "without requesting stop, queue size: "
       << iterations.size() << " num currently empty: " << empty_q.size()
       << " num being filled: " << num_being_filled
       << " filled_q.size(): " << filled_q.size();
 
-    flow_state = FlowState::FLOW_STATE_STOPPED;
     return ScopedView<Iteration>(nullptr);
   }
 
-  return ScopedView<Iteration>(&iteration->get(),
-                               [this, iteration] { markEmpty(iteration); });
+  return ScopedView<Iteration>(
+    &iteration->get(), [this, iteration] { markEmpty(iteration); },
+    [this, iteration] {
+      std::unique_lock lock(filled_mutex);
+      flow_state.store(FlowState::FLOW_STATE_STOPPED);
+      markEmpty(iteration);
+    });
 }
 
 void IterationQueue::notifyEndOfRequestEmpty() {
   std::unique_lock lg(empty_mutex);
-  NNTR_THROW_IF(flow_state != FlowState::FLOW_STATE_OPEN, std::invalid_argument)
-    << "notifyEndOfRequestEmpty() must be called once";
-
+  NNTR_THROW_IF(flow_state.exchange(FlowState::FLOW_STATE_STOP_REQUESTED) !=
+                  FlowState::FLOW_STATE_OPEN,
+                std::invalid_argument)
+    << "the queue expect state of "
+    << static_cast<unsigned>(FlowState::FLOW_STATE_STOP_REQUESTED)
+    << " but met " << static_cast<unsigned>(flow_state.load());
   /// below is useful information when debugging iteration queue, but there will
-  /// be to much log if we turn the log on. so leaving it as a comment for now.
+  /// be too much log if we turn the log on. so leaving it as a comment for now.
   // std::cout << "[notifyEnd] empty_q.size(): " << empty_q.size()
   //           << " num being filled: " << num_being_filled
   //           << " filled_q.size(): " << filled_q.size() << '\n';
 
-  flow_state = FlowState::FLOW_STATE_STOP_REQUESTED;
   if (being_filled) {
     being_filled->setEndSample(current_iterator + 1);
   }
@@ -176,6 +196,18 @@ void IterationQueue::notifyEndOfRequestEmpty() {
   filled_q.push(nullptr);
 }
 
+void IterationQueue::markFilled(MarkableIteration *iteration) {
+  std::unique_lock lg(empty_mutex);
+  num_being_filled--;
+  filled_q.push(iteration);
+  lg.unlock();
+  notify_emptied_cv.notify_all();
+}
+
+void IterationQueue::markEmpty(MarkableIteration *iteration) {
+  empty_q.push(iteration);
+}
+
 IterationQueue::MarkableIteration::MarkableIteration(
   const std::vector<ml::train::TensorDim> &input_dims,
   const std::vector<ml::train::TensorDim> &label_dims, IterationQueue *iq) :
@@ -188,6 +220,12 @@ IterationQueue::MarkableIteration::MarkableIteration(MarkableIteration &&rhs) :
   iteration(std::move(rhs.iteration)),
   iq(rhs.iq) {}
 
+void IterationQueue::MarkableIteration::reset() {
+  std::lock_guard notify_lock_guard(notify_mutex);
+  num_observed = 0;
+  iteration.setEndSample();
+}
+
 IterationQueue::MarkableIteration &IterationQueue::MarkableIteration::
 operator=(MarkableIteration &&rhs) {
   if (this == &rhs) {
@@ -200,18 +238,6 @@ operator=(MarkableIteration &&rhs) {
   return *this;
 }
 
-void IterationQueue::markFilled(MarkableIteration *iteration) /** noexcept */ {
-  std::unique_lock lg(empty_mutex);
-  num_being_filled--;
-  filled_q.push(iteration);
-  lg.unlock();
-  notify_emptied_cv.notify_all();
-}
-
-void IterationQueue::markEmpty(MarkableIteration *iteration) /** noexcept */ {
-  empty_q.push(iteration);
-}
-
 void IterationQueue::MarkableIteration::markSampleFilled() {
   std::scoped_lock notify_lock_guard(notify_mutex);
   num_observed++;
index 50e437343a66a59dce26f564c82f4df222eb69d6..53f51db5f9edf65f6f01e37c215e2d291de7b4c0 100644 (file)
@@ -15,6 +15,7 @@
 #ifndef __BATCH_QUEUE_H__
 #define __BATCH_QUEUE_H__
 
+#include <atomic>
 #include <condition_variable>
 #include <functional>
 #include <memory>
@@ -182,10 +183,13 @@ public:
    *
    * @param data_ reference of the underlying data
    * @param on_notify_ callback to be called on exit
+   * @param on_error_ callback to be called on error
    */
-  ScopedView(T *data_, std::function<void(void)> &&on_notify_ = nullptr) :
+  ScopedView(T *data_, std::function<void(void)> &&on_notify_ = nullptr,
+             std::function<void(void)> &&on_error_ = nullptr) :
     data(data_),
-    on_notify(std::forward<std::function<void(void)>>(on_notify_)) {}
+    on_notify(std::forward<std::function<void(void)>>(on_notify_)),
+    on_error(std::forward<std::function<void(void)>>(on_error_)) {}
 
   ScopedView(const ScopedView &rhs) = delete;
   ScopedView &operator=(const ScopedView &rhs) = delete;
@@ -207,7 +211,9 @@ public:
   ~ScopedView() {
     try {
       if (std::uncaught_exceptions()) {
-        /// NYI, add on_error handler here
+        if (on_error) {
+          on_error();
+        }
       } else {
         if (on_notify) {
           on_notify();
@@ -236,6 +242,7 @@ private:
   T *data; /**< underlying data pointer */
   std::function<void(void)>
     on_notify; /**< called when destroyed without error */
+  std::function<void(void)> on_error; /**< called when destroyed with error */
 };
 
 /**
@@ -314,7 +321,7 @@ public:
    *
    * @return unsigned int size of batch
    */
-  unsigned int batch() { return iterations.front().get().batch(); }
+  unsigned int batch() { return batch_size; }
 
   /**
    * @brief notifyEndOfRequest, when the producing by requestEmpty has finished.
@@ -348,6 +355,12 @@ private:
                       const std::vector<ml::train::TensorDim> &label_dims,
                       IterationQueue *iq);
 
+    /**
+     * @brief reset num observation and internal batch size of iteration
+     *
+     */
+    void reset();
+
     /**
      * @brief Construct a new Markable Iteration object
      *
@@ -433,8 +446,9 @@ private:
   std::condition_variable_any
     notify_emptied_cv;  /**< conditional variable to wait based on the
                            num_being_filled */
-  FlowState flow_state; /**< flow state of the queue */
+  std::atomic<FlowState> flow_state; /**< flow state of the queue */
 
+  unsigned int batch_size;
   ViewQueue<MarkableIteration> empty_q;  /**< iterations to be filled */
   ViewQueue<MarkableIteration> filled_q; /**< iterations to be served */
 };
index 91c95a10fca0ce07a99394a2fed2f9f3afbc7f8f..692f9605ec6300ee9faf438ae79857f84573d2dd 100644 (file)
@@ -91,6 +91,8 @@ void Iteration::setEndSample(std::vector<Sample>::iterator sample_iterator) {
   end_iterator = sample_iterator;
 }
 
+void Iteration::setEndSample() { end_iterator = samples.end(); }
+
 Sample::Sample(const Iteration &iter, unsigned int batch) :
   inputs(sliceTensor(iter.getInputsRef(), batch)),
   labels(sliceTensor(iter.getLabelsRef(), batch)) {}
index 73ddae8fe465de08ec17bd1f15c937e14f6d73ce..efda996e946fe8f0ef1e5cc4526bfab2a319cd4d 100644 (file)
@@ -118,6 +118,12 @@ public:
    */
   void setEndSample(std::vector<Sample>::iterator sample_iterator);
 
+  /**
+   * @brief Set the End Sample to the original end
+   *
+   */
+  void setEndSample();
+
 private:
   std::vector<Tensor> inputs, labels;
   std::vector<Sample> samples;
index 9e1d7ee07898ae6a4fc0f08ad41bde76a46d7e8b..eb6e808e5664c6dc0760ad7a2934a748a3919f1f 100644 (file)
@@ -318,8 +318,8 @@ TEST_P(IterQueueScenarios,
        produceAndConsumAsyncForDeterminedSizeConsumerRunningFirst_p) {
 
   auto producer = std::async(std::launch::async, [this]() {
-    sleep(1);
-    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+    std::this_thread::sleep_for(50ms);
+    for (unsigned int i = 0u; i < DATA_SIXE; ++i) {
       produceSample(i);
     }
   });
@@ -346,7 +346,7 @@ TEST_P(IterQueueScenarios,
   });
 
   auto consumer = std::async(std::launch::async, [this]() {
-    sleep(1);
+    std::this_thread::sleep_for(50ms);
     for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
       consumeIteration();
     }
@@ -368,7 +368,7 @@ TEST_P(IterQueueScenarios,
   });
 
   auto consumer = std::async(std::launch::async, [this]() {
-    sleep(1);
+    std::this_thread::sleep_for(50ms);
     while (consumeIteration()) {
     }
   });
@@ -382,7 +382,7 @@ TEST_P(IterQueueScenarios,
 TEST_P(IterQueueScenarios,
        produceAndConsumAsyncForUnknownSizeConsumerRunningFirst_p) {
   auto producer = std::async(std::launch::async, [this]() {
-    sleep(1);
+    std::this_thread::sleep_for(50ms);
     for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
       produceSample(i);
     }
@@ -521,7 +521,7 @@ TEST_P(IterQueueScenarios, caseThreeNotifyAfterTheLastBufferIsBeingFilled_p) {
 
     notify_result =
       std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
-    std::this_thread::sleep_for(500ms);
+    std::this_thread::sleep_for(50ms);
     /// delaying destroying scoped_views to simulate samples are in
     /// the state of being filled
   }
@@ -559,7 +559,7 @@ TEST_P(IterQueueScenarios, caseFourNotifyAfterTheLastBufferIsBeingServed_p) {
     sum_from_consumer += getSum(inputs, labels);
     EXPECT_FALSE(consumeIteration());
 
-    std::this_thread::sleep_for(500ms);
+    std::this_thread::sleep_for(50ms);
     /// delay here to delay destroying iter_view to simulate
     /// notifyEndOfRequestEmpty() is being called during destroying the last
     /// buffer
@@ -582,14 +582,62 @@ TEST_P(IterQueueScenarios, notifyEndAndTryRequestEmpty_n) {
   EXPECT_ANY_THROW(iq->requestEmpty());
 }
 
-TEST_P(IterQueueScenarios,
-       DISABLED_ScopedViewSampleHandlesThrowWhileFillingFails_n) {
-  /// NYI
+TEST_P(IterQueueScenarios, ScopedViewSampleHandlesThrowWhileFillingFails_n) {
+  std::future<void> notify_result;
+
+  std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
+  std::shared_future<void> ready_future(ready_promise.get_future());
+  auto request_fail = std::async(std::launch::async, [&, this] {
+    t1_ready_promise.set_value();
+    ready_future.wait();
+    auto sample_view = iq->requestEmpty();
+    throw std::invalid_argument("while filling, it failed");
+  });
+
+  auto try_consume = std::async(std::launch::async, [&, this] {
+    t2_ready_promise.set_value();
+    ready_future.wait();
+    auto iter_view = iq->requestFilled();
+    EXPECT_TRUE(iter_view.isEmpty());
+  });
+
+  t1_ready_promise.get_future().wait();
+  t2_ready_promise.get_future().wait();
+  ready_promise.set_value();
+
+  EXPECT_THROW(request_fail.get(), std::invalid_argument);
+  iq->notifyEndOfRequestEmpty();
+  try_consume.wait();
 }
 
-TEST_P(IterQueueScenarios,
-       DISABLED_ScopedViewIterationHandlesThrowWhileFillingFails_n) {
-  /// NYI
+TEST_P(IterQueueScenarios, ScopedViewIterationHandlesThrowWhileFillingFails_n) {
+  std::future<void> notify_result;
+
+  std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
+  std::shared_future<void> ready_future(ready_promise.get_future());
+  auto feed_data = std::async(std::launch::async, [&, this] {
+    t1_ready_promise.set_value();
+    ready_future.wait();
+    for (unsigned i = 0; i < iq->batch(); ++i) {
+      auto sample_view = iq->requestEmpty();
+    }
+  });
+
+  auto consume_fail = std::async(std::launch::async, [&, this] {
+    t2_ready_promise.set_value();
+    ready_future.wait();
+    auto iter_view = iq->requestFilled();
+    throw std::invalid_argument("while using, it failed");
+  });
+
+  t1_ready_promise.get_future().wait();
+  t2_ready_promise.get_future().wait();
+  ready_promise.set_value();
+
+  EXPECT_THROW(consume_fail.get(), std::invalid_argument);
+  feed_data.wait();
+  EXPECT_ANY_THROW(iq->requestEmpty());
+  EXPECT_TRUE(iq->requestFilled().isEmpty());
 }
 
 IterQueueTestParamType multi_slot_multi_batch = {
diff --git a/test/unittest/datasets/unittest_iteration_queue.cpp b/test/unittest/datasets/unittest_iteration_queue.cpp
new file mode 100644 (file)
index 0000000..747b00f
--- /dev/null
@@ -0,0 +1,598 @@
+// SPDX-License-Identifier: Apache-2.0
+/**
+ * Copyright (C) 2021 Jihoon Lee <jhoon.it.lee@samsung.com>
+ *
+ * @file unittest_iteration_queue.cpp
+ * @date 12 July 2021
+ * @brief Iteration Queue Test
+ * @see        https://github.com/nnstreamer/nntrainer
+ * @author Jihoon Lee <jhoon.it.lee@samsung.com>
+ * @bug No known bugs except for NYI items
+ */
+
+#include <gtest/gtest.h>
+
+#include <iteration_queue.h>
+#include <random_data_producers.h>
+#include <tensor.h>
+
+#include <algorithm>
+#include <future>
+#include <thread>
+#include <tuple>
+#include <vector>
+
+using namespace std::chrono_literals;
+
+/** THIS number can be divided with 3 and 4 is multiple of current multiple
+ * batch scenario setup, as a @todo this should be better part of a
+ * IterQueueTestParamType */
+static constexpr unsigned int DATA_SIZE = 384u;
+using IterQueueTestParamType =
+  std::tuple<unsigned int /**< queue size */,
+             std::vector<nntrainer::TensorDim> /**< input dimensions */,
+             std::vector<nntrainer::TensorDim> /**< label dimensions */>;
+
+/**
+ * @brief Iteration Queue Test
+ */
+class IterQueueScenarios
+  : public ::testing::TestWithParam<IterQueueTestParamType> {
+public:
+  /**
+   * @brief SetUp test cases here
+   *
+   */
+  virtual void SetUp() {
+    auto &[q_size, input_dims, label_dims] = GetParam();
+    iq = std::make_unique<nntrainer::IterationQueue>(q_size, input_dims,
+                                                     label_dims);
+    auto producer = std::make_unique<nntrainer::RandomDataOneHotProducer>();
+    producer->setProperty({"num_samples=512"});
+    sample_getter = producer->finalize(input_dims, label_dims);
+    this->input_dims = input_dims;
+    this->label_dims = label_dims;
+    sum_from_producer = 0;
+    sum_from_consumer = 0;
+  }
+
+  virtual void
+  produceSample(unsigned int size,
+                const std::chrono::milliseconds *duration = nullptr) {
+    auto sample_view = iq->requestEmpty();
+    if (sample_view.isEmpty()) {
+      throw std::runtime_error("sample_view is empty!");
+    }
+    auto &sample = sample_view.get();
+    auto &inputs = sample.getInputsRef();
+    auto &labels = sample.getLabelsRef();
+    if (duration) {
+      std::this_thread::sleep_for(*duration);
+    }
+    std::lock_guard<std::mutex> lg(producer_mutex);
+    sample_getter(size, inputs, labels);
+    sum_from_producer += getSum(inputs, labels);
+  }
+
+  virtual std::future<void>
+  produceSampleAfter(unsigned int size,
+                     const std::chrono::milliseconds &duration) {
+    return std::async(std::launch::async, [this, size, duration] {
+      produceSample(size, &duration);
+    });
+  }
+
+  virtual std::future<bool>
+  consumeIterationAfter(const std::chrono::milliseconds &duration) {
+    return std::async(std::launch::async, [this, duration] {
+      std::this_thread::sleep_for(duration);
+      return consumeIteration();
+    });
+  }
+
+  virtual bool consumeIteration() {
+    auto iter_view = iq->requestFilled();
+    if (iter_view.isEmpty()) {
+      return false;
+    }
+
+    auto &iter = iter_view.get();
+    auto &inputs = iter.getInputsRef();
+    auto &labels = iter.getLabelsRef();
+    std::lock_guard<std::mutex> lg(consumer_mutex);
+    sum_from_consumer += getSum(inputs, labels);
+    return true;
+  }
+
+  /**
+   * @brief do here if any memory needs to be released
+   *
+   */
+  virtual void TearDown(){};
+
+protected:
+  /**
+   * @brief Get a single value (sum) for the given inputs and outputs, this is
+   * to effectively reduce a tensor to a single value
+   *
+   * @param inputs inputs
+   * @param labels labels
+   * @return long double single value which sums everything
+   */
+  long double getSum(const std::vector<nntrainer::Tensor> &inputs,
+                     const std::vector<nntrainer::Tensor> &labels) {
+    auto accumulator = [](long double old_sum, const nntrainer::Tensor &t) {
+      return old_sum + (long double)t.sum({0, 1, 2, 3}).getValue(0, 0, 0, 0);
+    };
+
+    long double sum =
+      std::accumulate(inputs.begin(), inputs.end(), 0.0l, accumulator);
+    return std::accumulate(labels.begin(), labels.end(), sum, accumulator);
+  }
+
+  /** @todo currently there is an error between those two around 1e-8, this
+   * should be changed to integer based comparison */
+  long double
+    sum_from_producer; /**< sum of all the dataset from producer side*/
+  long double
+    sum_from_consumer; /**< sum of all the dataset from consumer side */
+
+  mutable std::mutex producer_mutex, consumer_mutex;
+  nntrainer::DataProducer::Generator sample_getter;
+  std::unique_ptr<nntrainer::IterationQueue> iq;
+  std::vector<nntrainer::TensorDim> input_dims; /**< input dims */
+  std::vector<nntrainer::TensorDim> label_dims; /**< output dims */
+};
+
+TEST_P(IterQueueScenarios, produceAndConsumeSingle_p) {
+  auto batch_size = iq->batch();
+  for (unsigned int i = 0; i < batch_size; ++i) {
+    produceSample(i);
+  }
+  consumeIteration();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, produceAndConsumeOnce_p) {
+  auto q_size = iq->slots();
+  auto q_size_in_sample = q_size * iq->batch();
+  /// step1: fill buffer to the queue (filling 0 ~ 11th samples)
+  for (unsigned int i = 0; i < q_size_in_sample; ++i) {
+    produceSample(i);
+  }
+
+  /// step2: consume the filled buffer from the queue
+  for (unsigned int i = 0; i < q_size; ++i) {
+    consumeIteration();
+  }
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, produceAndConsumeSyncTwice_p) {
+  auto q_size = iq->slots();
+  auto q_size_in_sample = q_size * iq->batch();
+  /// step1: fill buffer to the queue (filling full queue)
+  for (unsigned int i = 0; i < q_size_in_sample; ++i) {
+    produceSample(i);
+  }
+
+  /// step2: consume the filled buffer from the queue
+  for (unsigned int i = 0; i < q_size; ++i) {
+    consumeIteration();
+  }
+
+  /// step3: fill buffer to the queue (filling full queue)
+  for (unsigned int i = q_size_in_sample; i < q_size_in_sample * 2; ++i) {
+    produceSample(i);
+  }
+
+  /// step4: consume the filled buffer from the queue
+  for (unsigned int i = 0; i < q_size; ++i) {
+    consumeIteration();
+  }
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, produceAndConsumeSyncMixed_p) {
+  auto q_size = iq->slots();
+  auto q_size_in_sample = q_size * iq->batch();
+  /// step1: fill buffer to the queue (filling half samples)
+  for (unsigned int i = 0; i < q_size_in_sample / 2; ++i) {
+    produceSample(i);
+  }
+
+  /// step2: consume the filled buffer from the queue
+  for (unsigned int i = 0; i < q_size / 2; ++i) {
+    consumeIteration();
+  }
+
+  /// step3: fill buffer to the queue (filling rest half samples)
+  for (unsigned int i = q_size_in_sample / 2; i < q_size_in_sample; ++i) {
+    produceSample(i);
+  }
+
+  /// step4: consume the filled buffer from the queue
+  for (unsigned int i = q_size / 2; i < q_size; ++i) {
+    consumeIteration();
+  }
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+       produceAndConsumAsyncForDeterminedSizeConsumerRunningFirst_p) {
+
+  auto producer = std::async(std::launch::async, [this]() {
+    std::this_thread::sleep_for(50ms);
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+  });
+
+  auto consumer = std::async(std::launch::async, [this]() {
+    for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+      consumeIteration();
+    }
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+       produceAndConsumAsyncForDeterminedSizeProducerRunningFirst_p) {
+
+  auto producer = std::async(std::launch::async, [this]() {
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+  });
+
+  auto consumer = std::async(std::launch::async, [this]() {
+    std::this_thread::sleep_for(50ms);
+    for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+      consumeIteration();
+    }
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+       produceAndConsumAsyncForUnknownSizeProducerRunningFirst_p) {
+  auto producer = std::async(std::launch::async, [this]() {
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+    iq->notifyEndOfRequestEmpty();
+  });
+
+  auto consumer = std::async(std::launch::async, [this]() {
+    std::this_thread::sleep_for(50ms);
+    while (consumeIteration()) {
+    }
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios,
+       produceAndConsumAsyncForUnknownSizeConsumerRunningFirst_p) {
+  auto producer = std::async(std::launch::async, [this]() {
+    std::this_thread::sleep_for(50ms);
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+    iq->notifyEndOfRequestEmpty();
+  });
+
+  auto consumer = std::async(std::launch::async, [this]() {
+    while (consumeIteration()) {
+    }
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, produceAndConsumPartiallyFilledBatch_p) {
+  auto b = iq->batch();
+  if (b == 1) {
+    return; /// if batch is one, there is no partially filled batch
+  }
+
+  auto getSumOfPartiallyFilledTensor =
+    [](const nntrainer::Tensor &t, unsigned int actual_batch) -> long double {
+    long double sum = 0;
+    nntrainer::Tensor result = t.sum_by_batch();
+    for (unsigned int i = 0; i < actual_batch; ++i) {
+      sum += result.getValue(i, 0, 0, 0);
+    }
+    return sum;
+  };
+
+  for (unsigned int i = 0; i < b - 1; ++i) {
+    produceSample(i);
+  }
+
+  iq->notifyEndOfRequestEmpty();
+  {
+    auto iter_view = iq->requestFilled();
+    if (iter_view.isEmpty()) {
+      throw std::invalid_argument("iter view is empty!");
+    }
+    nntrainer::Iteration &it = iter_view.get();
+
+    auto &inputs = it.getInputsRef();
+    auto &labels = it.getLabelsRef();
+
+    for (auto &input : inputs) {
+      sum_from_consumer += getSumOfPartiallyFilledTensor(input, it.batch());
+    }
+    for (auto &label : labels) {
+      sum_from_consumer += getSumOfPartiallyFilledTensor(label, it.batch());
+    }
+  }
+  EXPECT_FALSE(consumeIteration());
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+/**
+ * When calling notifytEndOfRequestEmpty(), there are four possible states the
+ * queue is in.
+ *
+ * notified there will be no filling while...
+ * 1. the last buffer is already consumed and already marked as emptied
+ * 2. the last buffer has moved to filled_q.
+ * 3. the last buffer is being filled, and there are multiple buffers being
+ * filled
+ * 4. the last buffer is being served
+ *
+ */
+
+TEST_P(IterQueueScenarios, caseOneNotifyAfterConsumingIsFinished_p) {
+
+  std::promise<void> consume_done;
+  auto producer = std::async(std::launch::async, [this, &consume_done]() {
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
+      produceSample(i);
+    }
+    consume_done.get_future().get();
+    iq->notifyEndOfRequestEmpty();
+  });
+
+  auto consumer = std::async(std::launch::async, [this, &consume_done]() {
+    for (unsigned int i = 0u; i < DATA_SIZE / iq->batch(); ++i) {
+      consumeIteration();
+    }
+    consume_done.set_value();
+    EXPECT_FALSE(consumeIteration());
+  });
+
+  producer.get();
+  consumer.get();
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseTwoNotifyAfterTheLastBufferHasMovedToFilledQ_p) {
+  std::vector<std::future<void>> producer_result;
+
+  unsigned int number_of_producing = iq->batch() * iq->slots();
+  producer_result.reserve(number_of_producing);
+  for (unsigned int i = 0; i < number_of_producing; ++i) {
+    producer_result.push_back(produceSampleAfter(i, 50ms));
+  }
+
+  for (auto &fut : producer_result) {
+    fut.get();
+  }
+  iq->notifyEndOfRequestEmpty();
+
+  for (unsigned int i = 0; i < iq->slots(); ++i) {
+    EXPECT_TRUE(consumeIteration());
+  }
+  EXPECT_FALSE(consumeIteration());
+
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseThreeNotifyAfterTheLastBufferIsBeingFilled_p) {
+  std::future<void> notify_result;
+  {
+    std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
+    unsigned int number_of_producing = iq->batch() * iq->slots();
+    for (unsigned int i = 0; i < number_of_producing; ++i) {
+      scoped_views.push(iq->requestEmpty());
+      if (scoped_views.back().isEmpty()) {
+        throw std::runtime_error("sample was empty");
+      }
+      auto &sample = scoped_views.back().get();
+      auto &inputs = sample.getInputsRef();
+      auto &labels = sample.getLabelsRef();
+      sample_getter(i, inputs, labels);
+      sum_from_producer += getSum(inputs, labels);
+    }
+
+    notify_result =
+      std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
+    std::this_thread::sleep_for(50ms);
+    /// delaying destroying scoped_views to simulate samples are in
+    /// the state of being filled
+  }
+  notify_result.get();
+
+  for (unsigned int i = 0; i < iq->slots(); ++i) {
+    EXPECT_TRUE(consumeIteration());
+  }
+  EXPECT_FALSE(consumeIteration());
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+TEST_P(IterQueueScenarios, caseFourNotifyAfterTheLastBufferIsBeingServed_p) {
+  std::future<void> notify_result;
+  unsigned int number_of_producing = iq->batch() * iq->slots();
+
+  std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
+  for (unsigned int i = 0; i < number_of_producing; ++i) {
+    produceSample(i);
+  }
+
+  for (unsigned int i = 0; i < iq->slots() - 1; ++i) {
+    EXPECT_TRUE(consumeIteration());
+  }
+  {
+    auto iter_view = iq->requestFilled();
+    notify_result =
+      std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
+    if (iter_view.isEmpty()) {
+      throw std::invalid_argument("iter_view is empty!");
+    }
+    auto &iter = iter_view.get();
+    auto &inputs = iter.getInputsRef();
+    auto &labels = iter.getLabelsRef();
+    sum_from_consumer += getSum(inputs, labels);
+    EXPECT_FALSE(consumeIteration());
+
+    std::this_thread::sleep_for(50ms);
+    /// delay here to delay destroying iter_view to simulate
+    /// notifyEndOfRequestEmpty() is being called during destroying the last
+    /// buffer
+  }
+
+  notify_result.get();
+  EXPECT_FLOAT_EQ(sum_from_producer, sum_from_consumer);
+}
+
+/** @todo add a test that simulates multiple workers, for now this is impossible
+ * because we don't have thread safe worker */
+
+TEST_P(IterQueueScenarios, notifyEndTwice_n) {
+  iq->notifyEndOfRequestEmpty();
+  EXPECT_ANY_THROW(iq->notifyEndOfRequestEmpty());
+}
+
+TEST_P(IterQueueScenarios, notifyEndAndTryRequestEmpty_n) {
+  iq->notifyEndOfRequestEmpty();
+  EXPECT_ANY_THROW(iq->requestEmpty());
+}
+
+TEST_P(IterQueueScenarios, ScopedViewSampleHandlesThrowWhileFillingFails_n) {
+  std::future<void> notify_result;
+
+  std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
+  std::shared_future<void> ready_future(ready_promise.get_future());
+  auto request_fail = std::async(std::launch::async, [&, this] {
+    t1_ready_promise.set_value();
+    ready_future.wait();
+    auto sample_view = iq->requestEmpty();
+    throw std::invalid_argument("while filling, it failed");
+  });
+
+  auto try_consume = std::async(std::launch::async, [&, this] {
+    t2_ready_promise.set_value();
+    ready_future.wait();
+    auto iter_view = iq->requestFilled();
+    EXPECT_TRUE(iter_view.isEmpty());
+  });
+
+  t1_ready_promise.get_future().wait();
+  t2_ready_promise.get_future().wait();
+  ready_promise.set_value();
+
+  EXPECT_THROW(request_fail.get(), std::invalid_argument);
+  iq->notifyEndOfRequestEmpty();
+  try_consume.wait();
+}
+
+TEST_P(IterQueueScenarios, ScopedViewIterationHandlesThrowWhileFillingFails_n) {
+  std::future<void> notify_result;
+
+  std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
+  std::shared_future<void> ready_future(ready_promise.get_future());
+  auto feed_data = std::async(std::launch::async, [&, this] {
+    t1_ready_promise.set_value();
+    ready_future.wait();
+    for (unsigned i = 0; i < iq->batch(); ++i) {
+      auto sample_view = iq->requestEmpty();
+    }
+  });
+
+  auto consume_fail = std::async(std::launch::async, [&, this] {
+    t2_ready_promise.set_value();
+    ready_future.wait();
+    auto iter_view = iq->requestFilled();
+    throw std::invalid_argument("while using, it failed");
+  });
+
+  t1_ready_promise.get_future().wait();
+  t2_ready_promise.get_future().wait();
+  ready_promise.set_value();
+
+  EXPECT_THROW(consume_fail.get(), std::invalid_argument);
+  feed_data.wait();
+  EXPECT_ANY_THROW(iq->requestEmpty());
+  EXPECT_TRUE(iq->requestFilled().isEmpty());
+}
+
+IterQueueTestParamType multi_slot_multi_batch = {
+  4 /** queue size */,
+  {{3, 2, 4, 5}, {3, 4, 5, 7}} /** input_dims*/,
+  {{3, 1, 1, 8}, {3, 1, 1, 2}} /** label_dims */};
+
+IterQueueTestParamType single_slot_multi_batch = {
+  1 /** queue size */,
+  {{3, 2, 4, 5}, {3, 4, 5, 7}} /** input_dims*/,
+  {{3, 1, 1, 8}, {3, 1, 1, 2}} /** label_dims */};
+
+IterQueueTestParamType multi_slot_single_batch = {
+  3 /** queue size */,
+  {{1, 2, 4, 5}, {1, 4, 5, 7}} /** input_dims*/,
+  {{1, 1, 1, 8}, {1, 1, 1, 2}} /** label_dims */};
+
+IterQueueTestParamType single_slot_single_batch = {
+  1 /** queue size */,
+  {{1, 2, 4, 5}, {1, 4, 5, 7}} /** input_dims*/,
+  {{1, 1, 1, 8}, {1, 1, 1, 2}} /** label_dims */};
+
+INSTANTIATE_TEST_CASE_P(IterQueue, IterQueueScenarios,
+                        ::testing::Values(multi_slot_multi_batch,
+                                          single_slot_multi_batch,
+                                          multi_slot_single_batch,
+                                          single_slot_single_batch));
+
+TEST(IterQueue, constructEmptySlots_n) {
+  EXPECT_ANY_THROW(nntrainer::IterationQueue(0, {{1}}, {{1}}));
+}
+
+TEST(IterQueue, constructEmptyInput_n) {
+  EXPECT_ANY_THROW(nntrainer::IterationQueue(1, {}, {{1}}));
+}
+
+TEST(IterQueue, constructNotConsistentBatchSizeBetweenInputs_n) {
+  EXPECT_ANY_THROW(
+    nntrainer::IterationQueue(1, {{3, 1, 1, 10}, {2, 1, 1, 10}}, {}));
+}
+
+TEST(IterQueue, constructNotConsistentBatchSizeInLabel_n) {
+  EXPECT_ANY_THROW(nntrainer::IterationQueue(1, {{3, 1, 1, 10}, {3, 1, 1, 10}},
+                                             {{2, 1, 1, 10}}));
+}
+
+TEST(IterQueue, constructNotConsistentBatchSizeInLabel2_n) {
+  EXPECT_ANY_THROW(nntrainer::IterationQueue(1, {{3, 1, 1, 10}, {3, 1, 1, 10}},
+                                             {{3, 1, 1, 10}, {2, 1, 1, 10}}));
+}