[SWAP] Implement lookahead behavior
authorJiho Chu <jiho.chu@samsung.com>
Wed, 21 Dec 2022 01:29:33 +0000 (10:29 +0900)
committerJijoong Moon <jijoong.moon@samsung.com>
Thu, 9 Feb 2023 22:56:24 +0000 (07:56 +0900)
This patch implement lookahead behavior.
lookahead property is useed to preload cache elems in parallel.
executor thread maintains preloading task, and notifying when it
finishes loading.

Signed-off-by: Jiho Chu <jiho.chu@samsung.com>
nntrainer/graph/network_graph.h
nntrainer/tensor/manager.cpp
nntrainer/tensor/manager.h
nntrainer/tensor/tensor_pool.cpp
nntrainer/tensor/tensor_pool.h

index 55f0c08..0ba38d5 100644 (file)
@@ -55,8 +55,10 @@ public:
    * @param[in] enable_swap enable memory swap for tensor
    * @param[in] swap_path memory swap file path when the swap is enabled
    */
-  NetworkGraph(bool enable_swap, const std::string &swap_path = "") :
-    tensor_manager(std::make_shared<Manager>(enable_swap, swap_path)),
+  NetworkGraph(bool enable_swap, const std::string &swap_path = "",
+               unsigned int lookahead = 0) :
+    tensor_manager(
+      std::make_shared<Manager>(enable_swap, swap_path, lookahead)),
     graph(),
     compiled(false),
     batch_size(0),
index ac70455..7b4bfb3 100644 (file)
@@ -630,13 +630,50 @@ Manager::getWeights(const std::function<bool(const Weight *)> &condition) {
 }
 
 void Manager::flushCache() {
-  weight_pool.flushCache();
-  tensor_pool.flushCache();
+  if (!swap_lookahead) {
+    weight_pool.flushCache();
+    tensor_pool.flushCache();
+  }
 }
 
 void Manager::flushCacheExcept(unsigned int order) {
-  weight_pool.flushCacheExcept(order);
-  tensor_pool.flushCacheExcept(order);
+  auto loadAsync = [&](TensorPool &pool, unsigned int order) {
+    return pool.loadCacheExecAsync(
+      order, [&](int id, TaskExecutor::CompleteStatus status) {
+        std::scoped_lock<std::mutex> lock(completed_mutex);
+        completed[id].set_value(true);
+      });
+  };
+
+  auto waitComplete = [&](unsigned int o) {
+    auto &tasks = async_task_eos[o];
+
+    std::unique_lock<std::mutex> lock(completed_mutex);
+    auto w_fut = completed[std::get<0>(tasks)].get_future();
+    auto t_fut = completed[std::get<1>(tasks)].get_future();
+    lock.unlock();
+
+    w_fut.wait();
+    t_fut.wait();
+
+    async_task_eos.erase(o);
+  };
+
+  // TODO: lookahead > 1 is required.
+  if (swap_lookahead == 1) {
+    if (async_task_eos.count(order) == 1)
+      waitComplete(order);
+
+    auto load_weight = loadAsync(weight_pool, order + 1);
+    auto load_tensor = loadAsync(tensor_pool, order + 1);
+
+    NNTR_THROW_IF(load_weight < 0 || load_tensor < 0, std::runtime_error)
+      << "Failed to launch preloading task";
+    async_task_eos[order + 1] = std::make_tuple(load_weight, load_tensor);
+  } else {
+    weight_pool.flushCacheExcept(order);
+    tensor_pool.flushCacheExcept(order);
+  }
 }
 
 void Manager::finalizeTensorPool(TensorPool &pool, unsigned int start,
index 455baa4..e4d1a36 100644 (file)
@@ -135,10 +135,12 @@ public:
   /**
    * @brief     Constructor of Manager
    */
-  Manager(bool enable_swap, const std::string &swap_path = "") :
+  Manager(bool enable_swap, const std::string &swap_path = "",
+          unsigned int lookahead = 0) :
     weight_pool(enable_swap, swap_path, "weight_pool"),
     tensor_pool(enable_swap, swap_path, "tensor_pool"),
-    enable_optimizations(true) {}
+    enable_optimizations(true),
+    swap_lookahead(lookahead) {}
 
   /**
    * @brief Construct a new Manager object (deleted)
@@ -453,6 +455,10 @@ public:
    * @brief flush cache data except the order
    *
    * @param order except execution order
+   * @param lookahead preloading size
+   * @note preloading loads execution order data asynchronously,
+   *       for lookahead size. If new flush request arrives,
+   *       it waits previous preloading is completed and invokes new one.
    */
   void flushCacheExcept(unsigned int order);
 
@@ -473,8 +479,18 @@ private:
   TensorPool weight_pool; /**< tensor pool to request tensors */
   TensorPool tensor_pool; /**< tensor pool to request tensors */
 
+  std::map<unsigned int, std::tuple<int, int>> async_task_eos;
+  /**< async tasks <execution order, <weight_pool completed id, tensor_pool
+   * completed id>>
+   */
+  std::map<int, std::promise<bool>> completed;
+  /**< async tasks completion <task id, promise> */
+  std::mutex completed_mutex; /**< mutex for async tasks completion */
+
   bool enable_optimizations; /**< to enable memory optimizations */
 
+  unsigned int swap_lookahead; /** lookahead for memory swap */
+
   /**
    * @brief Finalize the given tensor pool
    *
index 70ce920..81b8ce9 100644 (file)
@@ -154,9 +154,9 @@ void TensorPool::finalize(const MemoryPlanner &planner,
      * 3. requestMemory for all the tensors and set their tokens
      * @note +1 is to make the validity_end exlusive in the interval range
      */
-    details->token =
-      mem_pool->requestMemory(spec.tensor->bytes(), validity_start,
-                              validity_end + 1, details->exec_order);
+    details->token = mem_pool->requestMemory(
+      spec.tensor->bytes(), validity_start, validity_end + 1,
+      details->exec_order, details->lifespan);
 #ifdef DEBUG
     if (details->token == 0)
       throw std::runtime_error("Received invalid token from memory pool");
@@ -199,12 +199,18 @@ void TensorPool::allocate() {
     spec.tensor->setData(mem_pool->getMemory(details->token), 0, true);
     syncDependents(spec);
   }
+
+  if (cache_loader)
+    cache_loader->init();
 }
 
 /**
  * @brief Deallocate memory for all the managed tensors
  */
 void TensorPool::deallocate() {
+  if (cache_loader)
+    cache_loader->finish();
+
   mem_pool->deallocate();
 
   /** nullify the data pointers for the tensors */
@@ -420,4 +426,24 @@ void TensorPool::flushCacheExcept(unsigned int order) {
     pool->flushExcept(order);
 }
 
+void TensorPool::loadCacheExec(unsigned int order) {
+  if (auto pool = dynamic_cast<CachePool *>(mem_pool.get()))
+    cache_loader->load(order);
+}
+
+int TensorPool::loadCacheExecAsync(
+  unsigned int order, TaskExecutor::CompleteCallback complete_callback) {
+  if (auto pool = dynamic_cast<CachePool *>(mem_pool.get()))
+    return cache_loader->loadAsync(order, complete_callback);
+  else
+    return -1;
+}
+
+void TensorPool::loadCacheCancel(int id) {
+  if (dynamic_cast<CachePool *>(mem_pool.get()) == nullptr)
+    return;
+
+  cache_loader->cancelAsync(id);
+}
+
 } // namespace nntrainer
index a4264a6..e16b0d2 100644 (file)
@@ -23,6 +23,7 @@
 #include <variant>
 #include <vector>
 
+#include <cache_loader.h>
 #include <cache_pool.h>
 #include <tensor.h>
 #include <tensor_wrap_specs.h>
@@ -41,17 +42,22 @@ public:
   /**
    * @brief     Constructor of TensorPool
    */
-  TensorPool() : mem_pool(std::make_unique<MemoryPool>()) {}
+  TensorPool() :
+    mem_pool(std::make_unique<MemoryPool>()),
+    cache_loader(nullptr) {}
 
   /**
    * @brief     Constructor of TensorPool
    */
   TensorPool(bool enable_swap, const std::string &swap_path = "",
              const std::string &swap_name = "") {
-    if (enable_swap)
-      mem_pool = std::make_unique<CachePool>(swap_path, swap_name);
-    else
-      mem_pool = std::make_unique<MemoryPool>();
+    if (enable_swap) {
+      auto cache_pool = std::make_shared<CachePool>(swap_path, swap_name);
+      cache_loader = std::make_unique<CacheLoader>(cache_pool);
+      mem_pool = cache_pool;
+    } else {
+      mem_pool = std::make_shared<MemoryPool>();
+    }
   }
 
   /**
@@ -258,6 +264,29 @@ public:
    */
   void flushCacheExcept(unsigned int order);
 
+  /**
+   * @brief load cache data by execution order
+   *
+   * @param order execution order
+   */
+  void loadCacheExec(unsigned int order);
+
+  /**
+   * @brief load cache data by execution order
+   *
+   * @param order execution order
+   * @return async task id
+   */
+  int loadCacheExecAsync(unsigned int order,
+                         TaskExecutor::CompleteCallback complete_callback);
+
+  /**
+   * @brief load cache data by execution order
+   *
+   * @param id async task id
+   */
+  void loadCacheCancel(int id);
+
 private:
   /**
    * @brief Source tensor detailed specification
@@ -353,7 +382,8 @@ private:
   std::vector<RequestSpec> pool; /**< list of requested tensors */
   std::unordered_map<std::string, unsigned int>
     name_map;                           /**< indexing of requested tensors */
-  std::unique_ptr<MemoryPool> mem_pool; /**< memory pool for the tensors */
+  std::shared_ptr<MemoryPool> mem_pool; /**< memory pool for the tensors */
+  std::unique_ptr<CacheLoader> cache_loader; /**< memory pool for the tensors */
 
   /**
    * @brief     Check if the lifespan leads to long term valitidy