[Dataset] Prepare q for the samplewise iteration
authorJihoon Lee <jhoon.it.lee@samsung.com>
Thu, 12 Aug 2021 07:35:24 +0000 (16:35 +0900)
committerJijoong Moon <jijoong.moon@samsung.com>
Thu, 26 Aug 2021 06:34:51 +0000 (15:34 +0900)
This patch prepares queue (`IterationQueue` to avoid naming conflict)
for the samplewise data feeding.

**Self evaluation:**
1. Build test: [X]Passed [ ]Failed [ ]Skipped
2. Run test: [X]Passed [ ]Failed [ ]Skipped

Signed-off-by: Jihoon Lee <jhoon.it.lee@samsung.com>
nntrainer/dataset/batch_queue.cpp
nntrainer/dataset/batch_queue.h
nntrainer/dataset/data_iteration.cpp
nntrainer/dataset/data_iteration.h

index 915268f..8fe4bf7 100644 (file)
@@ -68,4 +68,80 @@ bool BatchQueue::isEmpty() const {
   return q.empty();
 }
 
+IterationQueue::IterationQueue(
+  unsigned int num_slots, const std::vector<ml::train::TensorDim> &input_dims,
+  const std::vector<ml::train::TensorDim> &label_dims) {
+  iterations.reserve(num_slots);
+  for (decltype(num_slots) i = 0; i < num_slots; ++i) {
+    iterations.emplace_back(input_dims, label_dims, this);
+    empty_q.push(&iterations.back());
+  }
+}
+
+ScopedView<Sample> IterationQueue::requestEmpty() {
+  if (being_filled == nullptr) {
+    being_filled = empty_q.front();
+    empty_q.pop();
+    current_iterator = being_filled->get().begin();
+  } else {
+    current_iterator++;
+  }
+
+  auto view = ScopedView<Sample>(&(*current_iterator),
+                                 [this] { being_filled->markSampleFilled(); });
+
+  if (current_iterator + 1 == being_filled->get().end()) {
+    being_filled = nullptr;
+  }
+
+  return view;
+}
+
+ScopedView<Iteration> IterationQueue::requestFilled() {
+  auto iteration = filled_q.front();
+  filled_q.pop();
+  return ScopedView<Iteration>(&iteration->get(),
+                               [this, iteration] { markEmpty(iteration); });
+}
+
+IterationQueue::MarkableIteration::MarkableIteration(
+  const std::vector<ml::train::TensorDim> &input_dims,
+  const std::vector<ml::train::TensorDim> &label_dims, IterationQueue *iq) :
+  iteration(input_dims, label_dims),
+  iq(iq),
+  num_observed(0) {}
+
+IterationQueue::MarkableIteration::MarkableIteration(MarkableIteration &&rhs) :
+  iteration(std::move(rhs.iteration)),
+  iq(rhs.iq),
+  num_observed(rhs.num_observed) {}
+
+IterationQueue::MarkableIteration &IterationQueue::MarkableIteration::
+operator=(MarkableIteration &&rhs) {
+  if (this == &rhs) {
+    return *this;
+  }
+  std::swap(iteration, rhs.iteration);
+  std::swap(iq, rhs.iq);
+  std::swap(num_observed, rhs.num_observed);
+  return *this;
+}
+
+void IterationQueue::markFilled(MarkableIteration *iteration) /** noexcept */ {
+  filled_q.push(iteration);
+}
+
+void IterationQueue::markEmpty(MarkableIteration *iteration) /** noexcept */ {
+  empty_q.push(iteration);
+}
+
+void IterationQueue::MarkableIteration::markSampleFilled() {
+  std::lock_guard notify_lock_guard(notify_mutex);
+  num_observed++;
+  if (num_observed == iteration.batch()) {
+    iq->markFilled(this);
+    num_observed = 0;
+  }
+}
+
 } // namespace nntrainer
index fab06b9..0a48c94 100644 (file)
 #ifndef __BATCH_QUEUE_H__
 #define __BATCH_QUEUE_H__
 
-#include <queue>
-
 #include <condition_variable>
-#include <data_producer.h>
+#include <functional>
 #include <memory>
+#include <queue>
 #include <shared_mutex>
+#include <tuple>
 
+#include <data_iteration.h>
+#include <data_producer.h>
+#include <nntrainer_log.h>
+#include <tensor.h>
+#include <tensor_dim.h>
 namespace nntrainer {
 
 /**
@@ -91,11 +96,192 @@ private:
   std::condition_variable_any q_reader_cv;
   std::condition_variable_any q_writer_cv;
 
+  std::queue<std::unique_ptr<T>> q;
+};
+
+/**
+ * @brief A view container that calls a callback on destruct
+ * @note the callback must be noexcept, and the given underlying data must
+ * outlive the lifetime of this class
+ *
+ * @tparam T underlying type
+ */
+template <typename T> class ScopedView {
+public:
   /**
-   * @todo consider using circular buffer if this is too slow
+   * @brief Construct a new Scoped View object
    *
+   * @param data_ reference of the underlying data
+   * @param on_notify_ callback to be called on exit, this is not copied but
+   * reused
    */
-  std::queue<std::unique_ptr<T>> q;
+  ScopedView(T *data_, std::function<void(void)> &&on_notify_) :
+    data(data_),
+    on_notify(std::forward<std::function<void(void)>>(on_notify_)) {}
+
+  ScopedView(const ScopedView &rhs) = delete;
+  ScopedView &operator=(const ScopedView &rhs) = delete;
+
+  ScopedView(ScopedView &&rhs) = default;
+  ScopedView &operator=(ScopedView &&rhs) = default;
+
+  /**
+   * @brief Destroy the Scoped View object, callback is called at this time
+   *
+   */
+  ~ScopedView() {
+    try {
+      on_notify();
+    } catch (...) {
+      ml_loge("while notifiying, error happened");
+    }
+  }
+
+  /**
+   * @brief get the underlying data
+   *
+   * @return T & reference to the underlying data
+   */
+  T &get() { return *data; }
+
+  /**
+   * @brief get the underlying data
+   *
+   * @return T & reference to the underlying data
+   */
+  T const &get() const { return *data; }
+
+private:
+  T *data; /**< underlying data pointer */
+  std::function<void(void)>
+    on_notify; /**< called when destroyed without error */
+};
+
+/**
+ * @brief Iteration queue that owns the buffer for input / labels
+ * @detail
+ *
+ * - requestEmpty() will give a ScopedView<sample>
+ *     Destructing the returned object will notify the iteration that is done
+ * filling the sample. Once iteration is done filling, it will internally call
+ * IterationQueue::markFilled();
+ * - requestFilled() will give a ScopedView<Iteration>
+ *     Destructing this will notify the queue that is done used (internally
+ * calls IterationQueue::markEmpty())
+ *
+ * @todo apply this to the databuffer
+ * @todo prepare thread safe queue and apply
+ */
+class IterationQueue {
+public:
+  /**
+   * @brief Construct a new Iteration Queue object
+   * @note  input_dimension and label_dimension should include the batch, if
+   * IterationQueue::batch() is zero, it means it's invalid
+   * @param num_slots number of slots this batch queue will allocate, it should
+   * be buffersize/batchsize
+   * @param input_dims input dimensions
+   * @param label_dims label dimensions
+   */
+  IterationQueue(unsigned int num_slots,
+                 const std::vector<ml::train::TensorDim> &input_dims,
+                 const std::vector<ml::train::TensorDim> &label_dims);
+
+  /**
+   * @brief request empty sample from the queue.
+   * @note There is race condition between requesting empty, race condition with
+   * mark_ready should be handled by using MT_safe queue.
+   * @return ScopedView<Sample> sample view. Destroying the returned object will
+   * signal the queue that the sample is filled.
+   */
+  ScopedView<Sample> requestEmpty();
+
+  /**
+   * @brief request filled iteration from the queue.
+   * @note race condition here can be handled by using MT_safe queue
+   * @return ScopedView<Iteration> Ieration view. Destroying the returned object
+   * will signal the queue that the sample is done using.
+   */
+  ScopedView<Iteration> requestFilled();
+
+private:
+  /**
+   * @brief A wrapper object around @c Iteration which marks filled when filling
+   * sample is done
+   * @note the given @a iteration_ and @a bq_ must outleave the lifetime of this
+   * class
+   *
+   */
+  class MarkableIteration {
+  public:
+    /**
+     * @brief Construct a new Markable Iteration object
+     *
+     * @param input_dims input dimensions
+     * @param label_dims label dimensions
+     * @param iq_ iteration queue view to notify
+     */
+    MarkableIteration(const std::vector<ml::train::TensorDim> &input_dims,
+                      const std::vector<ml::train::TensorDim> &label_dims,
+                      IterationQueue *iq);
+
+    /**
+     * @brief Construct a new Markable Iteration object
+     *
+     * @param rhs right side to move
+     */
+    MarkableIteration(MarkableIteration &&rhs);
+
+    /**
+     * @brief Move Assignement operator
+     *
+     * @param rhs rhs to move
+     * @return MarkableIteration& markable iteration
+     */
+    MarkableIteration &operator=(MarkableIteration &&rhs);
+
+    /**
+     * @brief mark iteration that one sample is filled
+     * @todo make this function noexcept
+     */
+    void markSampleFilled() /** noexcept */;
+
+    /**
+     * @brief get underlying iteration
+     *
+     * @return Iteration& iteration
+     */
+    Iteration &get() { return iteration; }
+
+  private:
+    mutable std::mutex notify_mutex;
+    Iteration iteration;
+    IterationQueue *iq;
+    unsigned int num_observed;
+  };
+
+  /**
+   * @brief mark the given iteration filled
+   * @todo make this noexcept with the thread safe queue
+   * @param iteration iteration to mark it as filled
+   */
+  void markFilled(MarkableIteration *iteration) /** noexcept */;
+
+  /**
+   * @brief mark the given iteration empty
+   * @todo make this noexcept with the thread safe queue
+   * @param iteration iteration to mark it as emptied
+   */
+  void markEmpty(MarkableIteration *iteration) /** noexcept */;
+
+  std::vector<MarkableIteration> iterations; /** allocated iterations */
+  MarkableIteration *being_filled; /**< iteration that is being filled now */
+
+  std::vector<Sample>::iterator current_iterator;
+
+  /// @todo use mt safe queue
+  std::queue<MarkableIteration *> empty_q;  /** iterations to be filled */
+  std::queue<MarkableIteration *> filled_q; /** iterations to be served */
 };
 
 } // namespace nntrainer
index a66301b..8d410ef 100644 (file)
@@ -23,24 +23,6 @@ namespace nntrainer {
 namespace {
 
 /**
- * @brief return allocated tensors from dimensions
- *
- * @param dims dimensions
- * @return std::vector<Tensor> allocated tensors
- */
-std::vector<Tensor>
-tensorsFromDims(const std::vector<ml::train::TensorDim> &dims) {
-  std::vector<Tensor> t;
-  t.reserve(dims.size());
-
-  for (auto &dim : dims) {
-    t.emplace_back(dim);
-  }
-
-  return t;
-}
-
-/**
  * @brief check if all the dimension has the same batch, this is required
  * assumption for the creation of Iteration
  *
@@ -95,8 +77,8 @@ std::vector<Sample> unpackIteration(Iteration &iter) {
 
 Iteration::Iteration(const std::vector<ml::train::TensorDim> &input_dims,
                      const std::vector<ml::train::TensorDim> &label_dims) :
-  inputs(tensorsFromDims(input_dims)),
-  labels(tensorsFromDims(label_dims)) {
+  inputs(input_dims.begin(), input_dims.end()),
+  labels(label_dims.begin(), label_dims.end()) {
 
   NNTR_THROW_IF(!isBatchSame(input_dims, label_dims), std::invalid_argument)
     << "check batch size is all the same for all the input and label";
index e8d8e32..5458673 100644 (file)
@@ -35,7 +35,7 @@ public:
   /**
    * @brief Construct a new Iteration object
    * @note the batch dimension must be the same for all given dimensions and the
-   * first input must not be empty
+   * there must be at least one input
    *
    * @param input_dims input dimension
    * @param label_dims label dimension
@@ -102,7 +102,7 @@ public:
    *
    * @return std::vector<Sample>::const_iterator
    */
-  std::vector<Sample>::const_iterator begin() const { return samples.end(); }
+  std::vector<Sample>::const_iterator begin() const { return samples.begin(); }
 
   /**
    * @brief get sample iterator end