[Dataset] Reflect final reviews
authorJihoon Lee <jhoon.it.lee@samsung.com>
Fri, 27 Aug 2021 10:39:45 +0000 (19:39 +0900)
committerJijoong Moon <jijoong.moon@samsung.com>
Fri, 27 Aug 2021 11:44:58 +0000 (20:44 +0900)
**Changes proposed in this PR:**
- rename request* / requestSlots()
- change raw_file_data_producer

**Self evaluation:**
1. Build test: [X]Passed [ ]Failed [ ]Skipped
2. Run test: [X]Passed [ ]Failed [ ]Skipped

Signed-off-by: Jihoon Lee <jhoon.it.lee@samsung.com>
nntrainer/dataset/databuffer.cpp
nntrainer/dataset/iteration_queue.cpp
nntrainer/dataset/iteration_queue.h
nntrainer/dataset/raw_file_data_producer.cpp
test/unittest/datasets/unittest_iteration_queue.cpp

index c596ae0..cd30854 100644 (file)
@@ -106,7 +106,7 @@ DataBuffer::startFetchWorker(const std::vector<TensorDim> &input_dims,
       auto notifier = NotifyOnDestruct(iq.get());
       for (unsigned int i = 0; i < DataProducer::SIZE_UNDEFINED; ++i) {
         /// below loop can be parallelized
-        auto sample_view = iq->requestEmpty();
+        auto sample_view = iq->requestEmptySlot();
         NNTR_THROW_IF(sample_view.isEmpty(), std::runtime_error)
           << "[Databuffer] Cannot fill empty buffer";
         auto &sample = sample_view.get();
@@ -138,7 +138,7 @@ DataBuffer::startFetchWorker(const std::vector<TensorDim> &input_dims,
     auto notifier = NotifyOnDestruct(iq.get());
     for (unsigned int i = 0; i < size; ++i) {
       /// below loop can be parallelized
-      auto sample_view = iq->requestEmpty();
+      auto sample_view = iq->requestEmptySlot();
       NNTR_THROW_IF(sample_view.isEmpty(), std::runtime_error)
         << "[Databuffer] Cannot fill empty buffer";
       auto &sample = sample_view.get();
@@ -161,7 +161,7 @@ ScopedView<Iteration> DataBuffer::fetch() {
   NNTR_THROW_IF(!iq, std::runtime_error)
     << "Cannot fetch, either fetcher is not running or fetcher has ended and "
        "invalidated";
-  return iq->requestFilled();
+  return iq->requestFilledSlot();
 }
 
 std::tuple<DataProducer::Generator /** generator */, unsigned int /** size */>
index 3c2b2e1..a2758f5 100644 (file)
@@ -51,7 +51,7 @@ IterationQueue::~IterationQueue() {
   }
 }
 
-ScopedView<Sample> IterationQueue::requestEmpty() {
+ScopedView<Sample> IterationQueue::requestEmptySlot() {
   std::scoped_lock lg(empty_mutex);
   auto current_flow_state = flow_state.load();
   NNTR_THROW_IF(current_flow_state != FlowState::FLOW_STATE_OPEN,
@@ -62,7 +62,7 @@ ScopedView<Sample> IterationQueue::requestEmpty() {
 
   /// below is useful information when debugging iteration queue, but there will
   /// be too much log if we turn the log on. so leaving it as a comment for now.
-  // std::cout << "[requestEmpty] empty_q.size(): " << empty_q.size()
+  // std::cout << "[requestEmptySlot] empty_q.size(): " << empty_q.size()
   // << " being_filled: " << num_being_filled
   // << " filled_q.size():  " << filled_q.size() << '\n';
 
@@ -90,12 +90,12 @@ ScopedView<Sample> IterationQueue::requestEmpty() {
   return view;
 }
 
-ScopedView<Iteration> IterationQueue::requestFilled() {
+ScopedView<Iteration> IterationQueue::requestFilledSlot() {
   std::scoped_lock lock(filled_mutex);
 
   /// below is useful information when debugging iteration queue, but there will
   /// be too much log if we turn the log on. so leaving it as a comment for now.
-  // std::cout << "[requestFilled] empty_q.size(): " << empty_q.size()
+  // std::cout << "[requestFilledSlot] empty_q.size(): " << empty_q.size()
   // << " num being filled: " << num_being_filled
   // << " filled_q.size(): " << filled_q.size() << '\n';
   if (flow_state.load() == FlowState::FLOW_STATE_STOPPED) {
@@ -129,6 +129,12 @@ ScopedView<Iteration> IterationQueue::requestFilled() {
 void IterationQueue::notifyEndOfRequestEmpty() {
   std::unique_lock lg(empty_mutex);
   auto open_state = FlowState::FLOW_STATE_OPEN;
+
+  /// we have to defined ordering of having stop_requested -> push nullptr to
+  /// filled_q -> stopped so when the case of changing to stopped it has to push
+  /// nullptr to empty_q, and filled_q to wake them up and stop. this has
+  /// potential cases that weren't considered. let's change this to a simpler
+  /// mechanisms to wait on conditional variable.
   bool exchange_result = flow_state.compare_exchange_strong(
     open_state, FlowState::FLOW_STATE_STOP_REQUESTED);
   NNTR_THROW_IF(!exchange_result, std::invalid_argument)
index 1b84da8..1912333 100644 (file)
@@ -177,11 +177,11 @@ private:
  * @brief Iteration queue that owns the buffer for input / labels
  * @details
  *
- * - requestEmpty() will give a ScopedView<sample>
+ * - requestEmptySlot() will give a ScopedView<sample>
  *     Destructing the returned object will notify the iteration that is done
  * filling the sample. Once iteration is done filling, it will internally call
  * IterationQueue::markFilled();
- * - requestFilled() will give a ScopedView<Iteration>
+ * - requestFilledSlot() will give a ScopedView<Iteration>
  *     Destructing this will notify the queue that is done used (internally
  * calls IterationQueue::markEmpty())
  *
@@ -224,7 +224,7 @@ public:
    * if there is no more data coming. Destroying the returned object will
    * signal the queue that the sample is filled.
    */
-  ScopedView<Sample> requestEmpty();
+  ScopedView<Sample> requestEmptySlot();
 
   /**
    * @brief request filled iteration from the queue.
@@ -235,7 +235,7 @@ public:
    * signal the queue that the sample is done using.
    *
    */
-  ScopedView<Iteration> requestFilled();
+  ScopedView<Iteration> requestFilledSlot();
 
   /**
    * @brief get slot size, slot size is number of batches inside the queue
@@ -252,10 +252,11 @@ public:
   unsigned int batch() { return batch_size; }
 
   /**
-   * @brief notifyEndOfRequest, when the producing by requestEmpty has finished.
+   * @brief notifyEndOfRequest, when the producing by requestEmptySlot has
+   * finished.
    * @note It is important that the owner of this class must ensure that there
-   * will be no more requestEmpty call after this. This means that, in case of
-   * multiple workers, the manager of the worker(producer) must know every
+   * will be no more requestEmptySlot call after this. This means that, in case
+   * of multiple workers, the manager of the worker(producer) must know every
    * producer has finished. and call this function other than each worker call
    * this function.
    *
index 8f951df..a9fe430 100644 (file)
@@ -59,27 +59,16 @@ RawFileDataProducer::finalize(const std::vector<TensorDim> &input_dims,
   sample_size = std::accumulate(label_dims.begin(), label_dims.end(),
                                 sample_size, size_accumulator);
 
-  /****************** Prepare states ****************/
-  auto idxes_ = std::vector<unsigned int>();
-  idxes_.reserve(sz);
-  /// idxes point to the file position in bytes where a sample starts
-  std::generate_n(std::back_inserter(idxes_), sz,
-                  [sample_size, current = 0ULL]() mutable {
-                    auto c = current;
-                    current += sample_size * RawFileDataProducer::pixel_size;
-                    return c;
-                  });
-
   /// as we are passing the reference of file, this means created lamabda is
   /// tightly couple with the file, this is not desirable but working fine for
   /// now...
   file = std::ifstream(path_prop.get(), std::ios::binary);
-  return [idxes = std::move(idxes_), sz, this](unsigned int idx,
-                                               std::vector<Tensor> &inputs,
-                                               std::vector<Tensor> &labels) {
+  return [sample_size, sz, this](unsigned int idx, std::vector<Tensor> &inputs,
+                                 std::vector<Tensor> &labels) {
     NNTR_THROW_IF(idx >= sz, std::range_error)
       << "given index is out of bound, index: " << idx << " size: " << sz;
-    file.seekg(idxes[idx], std::ios_base::beg);
+    file.seekg(idx * sample_size * RawFileDataProducer::pixel_size,
+               std::ios_base::beg);
     for (auto &input : inputs) {
       input.read(file);
     }
index 915c740..75629ca 100644 (file)
@@ -59,7 +59,7 @@ public:
   virtual void
   produceSample(unsigned int size,
                 const std::chrono::milliseconds *duration = nullptr) {
-    auto sample_view = iq->requestEmpty();
+    auto sample_view = iq->requestEmptySlot();
     if (sample_view.isEmpty()) {
       throw std::runtime_error("sample_view is empty!");
     }
@@ -91,7 +91,7 @@ public:
   }
 
   virtual bool consumeIteration() {
-    auto iter_view = iq->requestFilled();
+    auto iter_view = iq->requestFilledSlot();
     if (iter_view.isEmpty()) {
       return false;
     }
@@ -227,7 +227,7 @@ TEST_P(IterQueueScenarios,
 
   auto producer = std::async(std::launch::async, [this]() {
     std::this_thread::sleep_for(50ms);
-    for (unsigned int i = 0u; i < DATA_SIXE; ++i) {
+    for (unsigned int i = 0u; i < DATA_SIZE; ++i) {
       produceSample(i);
     }
   });
@@ -330,7 +330,7 @@ TEST_P(IterQueueScenarios, produceAndConsumPartiallyFilledBatch_p) {
 
   iq->notifyEndOfRequestEmpty();
   {
-    auto iter_view = iq->requestFilled();
+    auto iter_view = iq->requestFilledSlot();
     if (iter_view.isEmpty()) {
       throw std::invalid_argument("iter view is empty!");
     }
@@ -416,7 +416,7 @@ TEST_P(IterQueueScenarios, caseThreeNotifyAfterTheLastBufferIsBeingFilled_p) {
     std::queue<nntrainer::ScopedView<nntrainer::Sample>> scoped_views;
     unsigned int number_of_producing = iq->batch() * iq->slots();
     for (unsigned int i = 0; i < number_of_producing; ++i) {
-      scoped_views.push(iq->requestEmpty());
+      scoped_views.push(iq->requestEmptySlot());
       if (scoped_views.back().isEmpty()) {
         throw std::runtime_error("sample was empty");
       }
@@ -455,7 +455,7 @@ TEST_P(IterQueueScenarios, caseFourNotifyAfterTheLastBufferIsBeingServed_p) {
     EXPECT_TRUE(consumeIteration());
   }
   {
-    auto iter_view = iq->requestFilled();
+    auto iter_view = iq->requestFilledSlot();
     notify_result =
       std::async(std::launch::async, [this] { iq->notifyEndOfRequestEmpty(); });
     if (iter_view.isEmpty()) {
@@ -487,7 +487,7 @@ TEST_P(IterQueueScenarios, notifyEndTwice_n) {
 
 TEST_P(IterQueueScenarios, notifyEndAndTryRequestEmpty_n) {
   iq->notifyEndOfRequestEmpty();
-  EXPECT_ANY_THROW(iq->requestEmpty());
+  EXPECT_ANY_THROW(iq->requestEmptySlot());
 }
 
 TEST_P(IterQueueScenarios, ScopedViewSampleHandlesThrowWhileFillingFails_n) {
@@ -498,14 +498,14 @@ TEST_P(IterQueueScenarios, ScopedViewSampleHandlesThrowWhileFillingFails_n) {
   auto request_fail = std::async(std::launch::async, [&, this] {
     t1_ready_promise.set_value();
     ready_future.wait();
-    auto sample_view = iq->requestEmpty();
+    auto sample_view = iq->requestEmptySlot();
     throw std::invalid_argument("while filling, it failed");
   });
 
   auto try_consume = std::async(std::launch::async, [&, this] {
     t2_ready_promise.set_value();
     ready_future.wait();
-    auto iter_view = iq->requestFilled();
+    auto iter_view = iq->requestFilledSlot();
     EXPECT_TRUE(iter_view.isEmpty());
   });
 
@@ -527,14 +527,14 @@ TEST_P(IterQueueScenarios, ScopedViewIterationHandlesThrowWhileFillingFails_n) {
     t1_ready_promise.set_value();
     ready_future.wait();
     for (unsigned i = 0; i < iq->batch(); ++i) {
-      auto sample_view = iq->requestEmpty();
+      auto sample_view = iq->requestEmptySlot();
     }
   });
 
   auto consume_fail = std::async(std::launch::async, [&, this] {
     t2_ready_promise.set_value();
     ready_future.wait();
-    auto iter_view = iq->requestFilled();
+    auto iter_view = iq->requestFilledSlot();
     throw std::invalid_argument("while using, it failed");
   });
 
@@ -544,8 +544,8 @@ TEST_P(IterQueueScenarios, ScopedViewIterationHandlesThrowWhileFillingFails_n) {
 
   EXPECT_THROW(consume_fail.get(), std::invalid_argument);
   feed_data.wait();
-  EXPECT_ANY_THROW(iq->requestEmpty());
-  EXPECT_TRUE(iq->requestFilled().isEmpty());
+  EXPECT_ANY_THROW(iq->requestEmptySlot());
+  EXPECT_TRUE(iq->requestFilledSlot().isEmpty());
 }
 
 IterQueueTestParamType multi_slot_multi_batch = {