9ae8522730363983623e82a43c0f4b9d3cd0d014
[platform/adaptation/npu/trix-engine.git] / src / core / ne-thread-pool.cc
1 /**
2  * Proprietary
3  * Copyright (C) 2020 Samsung Electronics
4  * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
5  */
6 /**
7  * @file ne-thread-pool.cc
8  * @date 27 Feb 2019
9  * @brief Source of thread pool to provide worker threads
10  * @author Dongju Chae <dongju.chae@samsung.com>
11  * @bug No known bugs except for NYI items
12  */
13
14 #include "ne-thread-pool.h"
15 #include <ne-conf.h>
16
17 #include <algorithm>
18
19 std::unique_ptr<ThreadPool> ThreadPool::instance_;
20 std::once_flag ThreadPool::once_flag_;
21
22 ThreadPool&
23 ThreadPool::getInstance () {
24   call_once (once_flag_, []() { instance_.reset (new ThreadPool); });
25   return *(instance_.get ());
26 }
27
28 /** @brief constructor of thread pool */
29 ThreadPool::ThreadPool () : stop_ (false) {
30   /** create workers with the number of threads configured */
31   num_threads_ = (Conf::getInstance ().getNumThreads ());
32   for (uint32_t i = 0; i < num_threads_; i++)
33     threads_.emplace_back ([this]() { this->worker (); });
34 }
35
36 /** @brief destructor of thread pool */
37 ThreadPool::~ThreadPool () {
38   /** terminate all workers */
39   {
40     std::unique_lock<std::mutex> lock (m_);
41     stop_ = true;
42   }
43   cv_.notify_all ();
44
45   for (auto& t : threads_) t.join ();
46 }
47
48 /** @brief worker main function */
49 void
50 ThreadPool::worker () {
51   while (1) {
52     std::unique_ptr<ThreadTask> task;
53     {
54       std::unique_lock<std::mutex> lock (m_);
55       cv_.wait (lock, [this]() { return should_stop () || !queue_.empty (); });
56
57       if (should_stop ())
58         break;
59
60       task = std::move (queue_.front ());
61       queue_.pop_front ();
62     }
63     /* without lock acquisition */
64     task->invoke ();
65   }
66 }
67
68 /**
69  * @brief enqueue task into the pool
70  * @param[in] task thread task instance
71  * @note one non-busy worker may handle this task
72  * @return 0 if no error, otherwise a negative errno
73  */
74 int
75 ThreadPool::enqueueTask (ThreadTask* task) {
76   if (task == nullptr)
77     return -EINVAL;
78
79   {
80     std::unique_lock<std::mutex> lock (m_);
81     std::deque<std::unique_ptr<ThreadTask>>::iterator it;
82     uint32_t task_id = task->getID ();
83
84     it = std::find_if (queue_.begin (), queue_.end (),
85                        [&task_id](const std::unique_ptr<ThreadTask>& t) {
86                          return t->getID () == task_id;
87                        });
88     /** does not allow the same ID */
89     if (it != queue_.end ())
90       return -EBUSY;
91
92     queue_.push_back (std::unique_ptr<ThreadTask> (task));
93   }
94
95   cv_.notify_one ();
96
97   return 0;
98 }
99
100 /**
101  * @brief remove task from the pool if it's not started yet
102  * @param[in] task_id task id indicating the target task to be removed
103  * @return 0 if no error. otherwise a negative errno
104  */
105 int
106 ThreadPool::removeTask (uint32_t task_id) {
107   std::unique_lock<std::mutex> lock (m_);
108   std::deque<std::unique_ptr<ThreadTask>>::iterator it;
109
110   it = std::find_if (queue_.begin (), queue_.end (),
111                      [&task_id](const std::unique_ptr<ThreadTask>& task) {
112                        return task->getID () == task_id;
113                      });
114   if (it == queue_.end ())
115     return -EBUSY;
116
117   queue_.erase (it);
118   return 0;
119 }