Merge "Ability to retrieve all the property values of a handle" into devel/master
[platform/core/uifw/dali-core.git] / dali / devel-api / threading / thread-pool.cpp
1 /*
2  * Copyright (c) 2018 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 #include "thread-pool.h"
19 #include <cmath>
20
21
22
23 namespace Dali
24 {
25 namespace
26 {
27 template<typename T, typename... Args>
28 std::unique_ptr<T> make_unique(Args&&... args)
29 {
30   return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
31 }
32 }
33
34 /**
35  * WorkerThread executes tasks submitted to the pool
36  */
37 class WorkerThread
38 {
39 public:
40
41   /**
42    * @brief Constructor of worker thread
43    * @param index Thread index assigned to the object during pool initialisation
44    */
45   explicit WorkerThread( uint32_t index );
46
47   /**
48    * @brief Destructor of the worker thread
49    */
50   ~WorkerThread();
51
52   WorkerThread(const WorkerThread &other) = delete;
53   WorkerThread &operator=(const WorkerThread &other) = delete;
54
55   /**
56    * @brief Adds task to the task queue
57    * @param task Task to be executed by the thread
58    */
59   void AddTask( Task task );
60
61   /**
62    * @brief Wakes up thread.
63    */
64   void Notify();
65
66   /**
67    * @brief Waits for the thread to complete all the tasks currently in the queue.
68    */
69   void Wait();
70
71 private:
72
73   /**
74    * @brief Internal thread loop function
75    */
76   void WaitAndExecute();
77
78   std::thread   mWorker;
79   uint32_t      mIndex;
80   TaskQueue     mTaskQueue;
81   std::mutex    mTaskQueueMutex;
82   std::condition_variable mConditionVariable;
83
84   bool mTerminating {false} ;
85 };
86
87 void WorkerThread::WaitAndExecute()
88 {
89   while( true )
90   {
91     Task task;
92
93     {
94       std::unique_lock< std::mutex > lock{ mTaskQueueMutex };
95
96       mConditionVariable.wait( lock, [ this ]() -> bool {
97         return !mTaskQueue.empty() || mTerminating;
98       } );
99
100       if( mTerminating )
101       {
102         break;
103       }
104
105       task = mTaskQueue.front();
106     }
107
108     task( mIndex );
109
110     {
111       std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
112
113       mTaskQueue.pop();
114
115       mConditionVariable.notify_one();
116     }
117   }
118 }
119
120 WorkerThread::WorkerThread(uint32_t index) : mIndex( index )
121 {
122   // Have to pass "this" as an argument because WaitAndExecute is a member function.
123   mWorker = std::thread{ &WorkerThread::WaitAndExecute, this };
124 }
125
126 WorkerThread::~WorkerThread()
127 {
128   if( mWorker.joinable() )
129   {
130     Notify();
131     Wait();
132
133     {
134       std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
135       mTerminating = true;
136       mConditionVariable.notify_one();
137     }
138
139     mWorker.join();
140   }
141 }
142
143 void WorkerThread::AddTask( Task task )
144 {
145   std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
146   mTaskQueue.push( std::move( task ) );
147   mConditionVariable.notify_one();
148 }
149
150 void WorkerThread::Notify()
151 {
152   std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
153   mConditionVariable.notify_one();
154 }
155
156 void WorkerThread::Wait()
157 {
158   std::unique_lock< std::mutex > lock{ mTaskQueueMutex };
159   mConditionVariable.wait( lock, [ this ]() -> bool {
160     return mTaskQueue.empty();
161   } );
162 }
163
164 // ThreadPool -----------------------------------------------------------------------------------------------
165
166 struct ThreadPool::Impl
167 {
168   std::vector<std::unique_ptr<WorkerThread>> mWorkers;
169   uint32_t                                   mWorkerIndex{ 0u };
170 };
171
172 ThreadPool::ThreadPool()
173 {
174   mImpl = make_unique<Impl>();
175 }
176
177 ThreadPool::~ThreadPool() = default;
178
179 bool ThreadPool::Initialize( uint32_t threadCount )
180 {
181   /**
182    * Get the system's supported thread count.
183    */
184   auto thread_count = threadCount + 1;
185   if( !threadCount )
186   {
187     thread_count = std::thread::hardware_concurrency();
188     if( !thread_count )
189     {
190       return false;
191     }
192   }
193
194   /**
195    * Spawn the worker threads.
196    */
197   for( auto i = 0u; i < thread_count - 1; i++ )
198   {
199     /**
200     * The workers will execute an infinite loop function
201     * and will wait for a job to enter the job queue. Once a job is in the the queue
202     * the threads will wake up to acquire and execute it.
203     */
204     mImpl->mWorkers.push_back( make_unique< WorkerThread >( i ) );
205   }
206
207   return true;
208 }
209
210
211 void ThreadPool::Wait()
212 {
213   for( auto& worker : mImpl->mWorkers )
214   {
215     worker->Wait();
216   }
217 }
218
219 SharedFuture ThreadPool::SubmitTask( uint32_t workerIndex, const Task& task )
220 {
221   auto future = std::shared_ptr< Future< void > >( new Future< void > );
222   mImpl->mWorkers[workerIndex]->AddTask( [task, future]( uint32_t index )
223                                   {
224                                     task( index );
225
226                                     future->mPromise.set_value();
227                                   });
228
229   return future;
230 }
231
232 SharedFuture ThreadPool::SubmitTasks( const std::vector< Task >& tasks )
233 {
234   auto future = std::shared_ptr< Future< void > >( new Future< void > );
235
236   mImpl->mWorkers[ mImpl->mWorkerIndex++ % static_cast< uint32_t >( mImpl->mWorkers.size() )]->AddTask(
237     [ future, tasks ]( uint32_t index ) {
238       for( auto& task : tasks )
239       {
240         task( index );
241       }
242
243       future->mPromise.set_value();
244
245     } );
246
247   return future;
248 }
249
250 UniqueFutureGroup ThreadPool::SubmitTasks( const std::vector< Task >& tasks, uint32_t threadMask )
251 {
252   auto retval = make_unique<FutureGroup<void>>();
253
254   /**
255    * Use square root of number of sumbitted tasks to estimate optimal number of threads
256    * used to execute jobs
257    */
258   auto threads = uint32_t(std::log2(float(tasks.size())));
259
260   if( threadMask != 0 )
261   {
262     threads = threadMask;
263   }
264
265   if( threads > mImpl->mWorkers.size() )
266   {
267     threads = uint32_t(mImpl->mWorkers.size());
268   }
269   else if( !threads )
270   {
271     threads = 1;
272   }
273
274   auto payloadPerThread = uint32_t(tasks.size() / threads);
275   auto remaining = uint32_t(tasks.size() % threads);
276
277   uint32_t taskIndex = 0;
278   uint32_t taskSize = uint32_t(remaining + payloadPerThread); // add 'remaining' tasks to the very first job list
279
280   for( auto wt = 0u; wt < threads; ++wt )
281   {
282     auto future = std::shared_ptr< Future< void > >( new Future< void > );
283     retval->mFutures.emplace_back( future );
284     mImpl->mWorkers[ mImpl->mWorkerIndex++ % static_cast< uint32_t >( mImpl->mWorkers.size() )]->AddTask(
285       [ future, tasks, taskIndex, taskSize ]( uint32_t index ) {
286         auto begin = tasks.begin() + int(taskIndex);
287         auto end = begin + int(taskSize);
288         for( auto it = begin; it < end; ++it )
289         {
290           (*it)( index );
291         }
292         future->mPromise.set_value();
293       } );
294
295     taskIndex += taskSize;
296     taskSize = payloadPerThread;
297   }
298
299   return retval;
300 }
301
302 size_t ThreadPool::GetWorkerCount() const
303 {
304   return mImpl->mWorkers.size();
305 }
306
307 } //namespace Dali