[dali_2.3.25] Merge branch 'devel/master'
[platform/core/uifw/dali-core.git] / dali / devel-api / threading / thread-pool.cpp
1 /*
2  * Copyright (c) 2020 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 #include "thread-pool.h"
19 #include <cmath>
20
21 namespace Dali
22 {
23 namespace
24 {
25 template<typename T, typename... Args>
26 std::unique_ptr<T> make_unique(Args&&... args)
27 {
28   return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
29 }
30 } // namespace
31
32 /**
33  * WorkerThread executes tasks submitted to the pool
34  */
35 class WorkerThread
36 {
37 public:
38   /**
39    * @brief Constructor of worker thread
40    * @param index Thread index assigned to the object during pool initialisation
41    */
42   explicit WorkerThread(uint32_t index);
43
44   /**
45    * @brief Destructor of the worker thread
46    */
47   ~WorkerThread();
48
49   WorkerThread(const WorkerThread& other) = delete;
50   WorkerThread& operator=(const WorkerThread& other) = delete;
51
52   /**
53    * @brief Adds task to the task queue
54    * @param task Task to be executed by the thread
55    */
56   void AddTask(Task task);
57
58   /**
59    * @brief Wakes up thread.
60    */
61   void Notify();
62
63   /**
64    * @brief Waits for the thread to complete all the tasks currently in the queue.
65    */
66   void Wait();
67
68 private:
69   /**
70    * @brief Internal thread loop function
71    */
72   void WaitAndExecute();
73
74   std::thread             mWorker;
75   uint32_t                mIndex;
76   TaskQueue               mTaskQueue;
77   std::mutex              mTaskQueueMutex;
78   std::condition_variable mConditionVariable;
79
80   bool mTerminating{false};
81 };
82
83 void WorkerThread::WaitAndExecute()
84 {
85   while(true)
86   {
87     Task task;
88
89     {
90       std::unique_lock<std::mutex> lock{mTaskQueueMutex};
91
92       mConditionVariable.wait(lock, [this]() -> bool {
93         return !mTaskQueue.empty() || mTerminating;
94       });
95
96       if(mTerminating)
97       {
98         break;
99       }
100
101       task = mTaskQueue.front();
102     }
103
104     task(mIndex);
105
106     {
107       std::lock_guard<std::mutex> lock{mTaskQueueMutex};
108
109       mTaskQueue.pop();
110
111       mConditionVariable.notify_one();
112     }
113   }
114 }
115
116 WorkerThread::WorkerThread(uint32_t index)
117 : mIndex(index)
118 {
119   // Have to pass "this" as an argument because WaitAndExecute is a member function.
120   mWorker = std::thread{&WorkerThread::WaitAndExecute, this};
121 }
122
123 WorkerThread::~WorkerThread()
124 {
125   if(mWorker.joinable())
126   {
127     Notify();
128     Wait();
129
130     {
131       std::lock_guard<std::mutex> lock{mTaskQueueMutex};
132       mTerminating = true;
133       mConditionVariable.notify_one();
134     }
135
136     mWorker.join();
137   }
138 }
139
140 void WorkerThread::AddTask(Task task)
141 {
142   std::lock_guard<std::mutex> lock{mTaskQueueMutex};
143   mTaskQueue.push(std::move(task));
144   mConditionVariable.notify_one();
145 }
146
147 void WorkerThread::Notify()
148 {
149   std::lock_guard<std::mutex> lock{mTaskQueueMutex};
150   mConditionVariable.notify_one();
151 }
152
153 void WorkerThread::Wait()
154 {
155   std::unique_lock<std::mutex> lock{mTaskQueueMutex};
156   mConditionVariable.wait(lock, [this]() -> bool {
157     return mTaskQueue.empty();
158   });
159 }
160
161 // ThreadPool -----------------------------------------------------------------------------------------------
162
163 struct ThreadPool::Impl
164 {
165   std::vector<std::unique_ptr<WorkerThread>> mWorkers;
166   uint32_t                                   mWorkerIndex{0u};
167 };
168
169 ThreadPool::ThreadPool()
170 {
171   mImpl = make_unique<Impl>();
172 }
173
174 ThreadPool::~ThreadPool() = default;
175
176 bool ThreadPool::Initialize(uint32_t threadCount)
177 {
178   /**
179    * Get the system's supported thread count.
180    */
181   auto thread_count = threadCount + 1;
182   if(!threadCount)
183   {
184     thread_count = std::thread::hardware_concurrency();
185     if(!thread_count)
186     {
187       return false;
188     }
189   }
190
191   /**
192    * Spawn the worker threads.
193    */
194   for(auto i = 0u; i < thread_count - 1; i++)
195   {
196     /**
197     * The workers will execute an infinite loop function
198     * and will wait for a job to enter the job queue. Once a job is in the the queue
199     * the threads will wake up to acquire and execute it.
200     */
201     mImpl->mWorkers.push_back(make_unique<WorkerThread>(i));
202   }
203
204   return true;
205 }
206
207 void ThreadPool::Wait()
208 {
209   for(auto& worker : mImpl->mWorkers)
210   {
211     worker->Wait();
212   }
213 }
214
215 SharedFuture ThreadPool::SubmitTask(uint32_t workerIndex, const Task& task)
216 {
217   auto future = std::shared_ptr<Future<void>>(new Future<void>);
218   mImpl->mWorkers[workerIndex]->AddTask([task, future](uint32_t index) {
219     task(index);
220
221     future->mPromise.set_value();
222   });
223
224   return future;
225 }
226
227 SharedFuture ThreadPool::SubmitTasks(const std::vector<Task>& tasks)
228 {
229   auto future = std::shared_ptr<Future<void>>(new Future<void>);
230
231   mImpl->mWorkers[mImpl->mWorkerIndex++ % static_cast<uint32_t>(mImpl->mWorkers.size())]->AddTask(
232     [future, tasks](uint32_t index) {
233       for(auto& task : tasks)
234       {
235         task(index);
236       }
237
238       future->mPromise.set_value();
239     });
240
241   return future;
242 }
243
244 UniqueFutureGroup ThreadPool::SubmitTasks(const std::vector<Task>& tasks, uint32_t threadMask)
245 {
246   auto retval = make_unique<FutureGroup<void>>();
247
248   /**
249    * Use square root of number of sumbitted tasks to estimate optimal number of threads
250    * used to execute jobs
251    */
252   auto threads = uint32_t(std::log2(float(tasks.size())));
253
254   if(threadMask != 0)
255   {
256     threads = threadMask;
257   }
258
259   if(threads > mImpl->mWorkers.size())
260   {
261     threads = uint32_t(mImpl->mWorkers.size());
262   }
263   else if(!threads)
264   {
265     threads = 1;
266   }
267
268   auto payloadPerThread = uint32_t(tasks.size() / threads);
269   auto remaining        = uint32_t(tasks.size() % threads);
270
271   uint32_t taskIndex = 0;
272   uint32_t taskSize  = uint32_t(remaining + payloadPerThread); // add 'remaining' tasks to the very first job list
273
274   for(auto wt = 0u; wt < threads; ++wt)
275   {
276     auto future = std::shared_ptr<Future<void>>(new Future<void>);
277     retval->mFutures.emplace_back(future);
278     mImpl->mWorkers[mImpl->mWorkerIndex++ % static_cast<uint32_t>(mImpl->mWorkers.size())]->AddTask(
279       [future, tasks, taskIndex, taskSize](uint32_t index) {
280         auto begin = tasks.begin() + int(taskIndex);
281         auto end   = begin + int(taskSize);
282         for(auto it = begin; it < end; ++it)
283         {
284           (*it)(index);
285         }
286         future->mPromise.set_value();
287       });
288
289     taskIndex += taskSize;
290     taskSize = payloadPerThread;
291   }
292
293   return retval;
294 }
295
296 size_t ThreadPool::GetWorkerCount() const
297 {
298   return mImpl->mWorkers.size();
299 }
300
301 } //namespace Dali