This patch implement lookahead behavior.
lookahead property is useed to preload cache elems in parallel.
executor thread maintains preloading task, and notifying when it
finishes loading.
Signed-off-by: Jiho Chu <jiho.chu@samsung.com>
* @param[in] enable_swap enable memory swap for tensor
* @param[in] swap_path memory swap file path when the swap is enabled
*/
- NetworkGraph(bool enable_swap, const std::string &swap_path = "") :
- tensor_manager(std::make_shared<Manager>(enable_swap, swap_path)),
+ NetworkGraph(bool enable_swap, const std::string &swap_path = "",
+ unsigned int lookahead = 0) :
+ tensor_manager(
+ std::make_shared<Manager>(enable_swap, swap_path, lookahead)),
graph(),
compiled(false),
batch_size(0),
}
void Manager::flushCache() {
- weight_pool.flushCache();
- tensor_pool.flushCache();
+ if (!swap_lookahead) {
+ weight_pool.flushCache();
+ tensor_pool.flushCache();
+ }
}
void Manager::flushCacheExcept(unsigned int order) {
- weight_pool.flushCacheExcept(order);
- tensor_pool.flushCacheExcept(order);
+ auto loadAsync = [&](TensorPool &pool, unsigned int order) {
+ return pool.loadCacheExecAsync(
+ order, [&](int id, TaskExecutor::CompleteStatus status) {
+ std::scoped_lock<std::mutex> lock(completed_mutex);
+ completed[id].set_value(true);
+ });
+ };
+
+ auto waitComplete = [&](unsigned int o) {
+ auto &tasks = async_task_eos[o];
+
+ std::unique_lock<std::mutex> lock(completed_mutex);
+ auto w_fut = completed[std::get<0>(tasks)].get_future();
+ auto t_fut = completed[std::get<1>(tasks)].get_future();
+ lock.unlock();
+
+ w_fut.wait();
+ t_fut.wait();
+
+ async_task_eos.erase(o);
+ };
+
+ // TODO: lookahead > 1 is required.
+ if (swap_lookahead == 1) {
+ if (async_task_eos.count(order) == 1)
+ waitComplete(order);
+
+ auto load_weight = loadAsync(weight_pool, order + 1);
+ auto load_tensor = loadAsync(tensor_pool, order + 1);
+
+ NNTR_THROW_IF(load_weight < 0 || load_tensor < 0, std::runtime_error)
+ << "Failed to launch preloading task";
+ async_task_eos[order + 1] = std::make_tuple(load_weight, load_tensor);
+ } else {
+ weight_pool.flushCacheExcept(order);
+ tensor_pool.flushCacheExcept(order);
+ }
}
void Manager::finalizeTensorPool(TensorPool &pool, unsigned int start,
/**
* @brief Constructor of Manager
*/
- Manager(bool enable_swap, const std::string &swap_path = "") :
+ Manager(bool enable_swap, const std::string &swap_path = "",
+ unsigned int lookahead = 0) :
weight_pool(enable_swap, swap_path, "weight_pool"),
tensor_pool(enable_swap, swap_path, "tensor_pool"),
- enable_optimizations(true) {}
+ enable_optimizations(true),
+ swap_lookahead(lookahead) {}
/**
* @brief Construct a new Manager object (deleted)
* @brief flush cache data except the order
*
* @param order except execution order
+ * @param lookahead preloading size
+ * @note preloading loads execution order data asynchronously,
+ * for lookahead size. If new flush request arrives,
+ * it waits previous preloading is completed and invokes new one.
*/
void flushCacheExcept(unsigned int order);
TensorPool weight_pool; /**< tensor pool to request tensors */
TensorPool tensor_pool; /**< tensor pool to request tensors */
+ std::map<unsigned int, std::tuple<int, int>> async_task_eos;
+ /**< async tasks <execution order, <weight_pool completed id, tensor_pool
+ * completed id>>
+ */
+ std::map<int, std::promise<bool>> completed;
+ /**< async tasks completion <task id, promise> */
+ std::mutex completed_mutex; /**< mutex for async tasks completion */
+
bool enable_optimizations; /**< to enable memory optimizations */
+ unsigned int swap_lookahead; /** lookahead for memory swap */
+
/**
* @brief Finalize the given tensor pool
*
* 3. requestMemory for all the tensors and set their tokens
* @note +1 is to make the validity_end exlusive in the interval range
*/
- details->token =
- mem_pool->requestMemory(spec.tensor->bytes(), validity_start,
- validity_end + 1, details->exec_order);
+ details->token = mem_pool->requestMemory(
+ spec.tensor->bytes(), validity_start, validity_end + 1,
+ details->exec_order, details->lifespan);
#ifdef DEBUG
if (details->token == 0)
throw std::runtime_error("Received invalid token from memory pool");
spec.tensor->setData(mem_pool->getMemory(details->token), 0, true);
syncDependents(spec);
}
+
+ if (cache_loader)
+ cache_loader->init();
}
/**
* @brief Deallocate memory for all the managed tensors
*/
void TensorPool::deallocate() {
+ if (cache_loader)
+ cache_loader->finish();
+
mem_pool->deallocate();
/** nullify the data pointers for the tensors */
pool->flushExcept(order);
}
+void TensorPool::loadCacheExec(unsigned int order) {
+ if (auto pool = dynamic_cast<CachePool *>(mem_pool.get()))
+ cache_loader->load(order);
+}
+
+int TensorPool::loadCacheExecAsync(
+ unsigned int order, TaskExecutor::CompleteCallback complete_callback) {
+ if (auto pool = dynamic_cast<CachePool *>(mem_pool.get()))
+ return cache_loader->loadAsync(order, complete_callback);
+ else
+ return -1;
+}
+
+void TensorPool::loadCacheCancel(int id) {
+ if (dynamic_cast<CachePool *>(mem_pool.get()) == nullptr)
+ return;
+
+ cache_loader->cancelAsync(id);
+}
+
} // namespace nntrainer
#include <variant>
#include <vector>
+#include <cache_loader.h>
#include <cache_pool.h>
#include <tensor.h>
#include <tensor_wrap_specs.h>
/**
* @brief Constructor of TensorPool
*/
- TensorPool() : mem_pool(std::make_unique<MemoryPool>()) {}
+ TensorPool() :
+ mem_pool(std::make_unique<MemoryPool>()),
+ cache_loader(nullptr) {}
/**
* @brief Constructor of TensorPool
*/
TensorPool(bool enable_swap, const std::string &swap_path = "",
const std::string &swap_name = "") {
- if (enable_swap)
- mem_pool = std::make_unique<CachePool>(swap_path, swap_name);
- else
- mem_pool = std::make_unique<MemoryPool>();
+ if (enable_swap) {
+ auto cache_pool = std::make_shared<CachePool>(swap_path, swap_name);
+ cache_loader = std::make_unique<CacheLoader>(cache_pool);
+ mem_pool = cache_pool;
+ } else {
+ mem_pool = std::make_shared<MemoryPool>();
+ }
}
/**
*/
void flushCacheExcept(unsigned int order);
+ /**
+ * @brief load cache data by execution order
+ *
+ * @param order execution order
+ */
+ void loadCacheExec(unsigned int order);
+
+ /**
+ * @brief load cache data by execution order
+ *
+ * @param order execution order
+ * @return async task id
+ */
+ int loadCacheExecAsync(unsigned int order,
+ TaskExecutor::CompleteCallback complete_callback);
+
+ /**
+ * @brief load cache data by execution order
+ *
+ * @param id async task id
+ */
+ void loadCacheCancel(int id);
+
private:
/**
* @brief Source tensor detailed specification
std::vector<RequestSpec> pool; /**< list of requested tensors */
std::unordered_map<std::string, unsigned int>
name_map; /**< indexing of requested tensors */
- std::unique_ptr<MemoryPool> mem_pool; /**< memory pool for the tensors */
+ std::shared_ptr<MemoryPool> mem_pool; /**< memory pool for the tensors */
+ std::unique_ptr<CacheLoader> cache_loader; /**< memory pool for the tensors */
/**
* @brief Check if the lifespan leads to long term valitidy