common taskscheduler: revise functionalities.
[platform/core/graphics/tizenvg.git] / src / lib / tvgTaskScheduler.cpp
1 /*
2  * Copyright (c) 2020 Samsung Electronics Co., Ltd. All rights reserved.
3
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10
11  * The above copyright notice and this permission notice shall be included in all
12  * copies or substantial portions of the Software.
13
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20  * SOFTWARE.
21  */
22 #include <deque>
23 #include <thread>
24 #include "tvgCommon.h"
25
26 /************************************************************************/
27 /* Internal Class Implementation                                        */
28 /************************************************************************/
29
30 namespace tvg {
31
32 struct TaskQueue {
33     deque<shared_ptr<Task>>  taskDeque;
34     mutex                    mtx;
35     condition_variable       ready;
36     bool                     done = false;
37
38     bool tryPop(shared_ptr<Task> &task)
39     {
40         unique_lock<mutex> lock{mtx, try_to_lock};
41         if (!lock || taskDeque.empty()) return false;
42         task = move(taskDeque.front());
43         taskDeque.pop_front();
44
45         return true;
46     }
47
48     bool tryPush(shared_ptr<Task> &&task)
49     {
50         {
51             unique_lock<mutex> lock{mtx, try_to_lock};
52             if (!lock) return false;
53             taskDeque.push_back(move(task));
54         }
55
56         ready.notify_one();
57
58         return true;
59     }
60
61     void complete()
62     {
63         {
64             unique_lock<mutex> lock{mtx};
65             done = true;
66         }
67         ready.notify_all();
68     }
69
70     bool pop(shared_ptr<Task> &task)
71     {
72         unique_lock<mutex> lock{mtx};
73
74         while (taskDeque.empty() && !done) {
75             ready.wait(lock);
76         }
77
78         if (taskDeque.empty()) return false;
79
80         task = move(taskDeque.front());
81         taskDeque.pop_front();
82
83         return true;
84     }
85
86     void push(shared_ptr<Task> &&task)
87     {
88         {
89             unique_lock<mutex> lock{mtx};
90             taskDeque.push_back(move(task));
91         }
92
93         ready.notify_one();
94     }
95
96 };
97
98
99 class TaskSchedulerImpl
100 {
101 public:
102     unsigned                       threadCnt;
103     vector<thread>                 threads;
104     vector<TaskQueue>              taskQueues{threadCnt};
105     atomic<unsigned>               idx{0};
106
107     TaskSchedulerImpl()
108     {
109         for (unsigned i = 0; i < threadCnt; ++i) {
110             threads.emplace_back([&, i] { run(i); });
111         }
112     }
113
114     ~TaskSchedulerImpl()
115     {
116         for (auto& queue : taskQueues) queue.complete();
117         for (auto& thread : threads) thread.join();
118     }
119
120     void run(unsigned i)
121     {
122         shared_ptr<Task> task;
123
124         //Thread Loop
125         while (true) {
126             auto success = false;
127
128             for (unsigned i = 0; i < threadCnt * 2; ++i) {
129                 if (taskQueues[(i + i) % threadCnt].tryPop(task)) {
130                     success = true;
131                     break;
132                 }
133             }
134
135             if (!success && !taskQueues[i].pop(task)) break;
136
137             (*task)();
138         }
139     }
140
141     void request(shared_ptr<Task> task)
142     {
143         //Async
144         if (threadCnt > 0) {
145             task->prepare();
146             auto i = idx++;
147             for (unsigned n = 0; n < threadCnt; ++n) {
148                 if (taskQueues[(i + n) % threadCnt].tryPush(move(task))) return;
149             }
150
151             taskQueues[i % threadCnt].push(move(task));
152
153         //Sync
154         } else {
155             task->run();
156         }
157     }
158 };
159
160 }
161
162 static TaskSchedulerImpl* inst = nullptr;
163
164 /************************************************************************/
165 /* External Class Implementation                                        */
166 /************************************************************************/
167
168 void TaskScheduler::init(unsigned threads)
169 {
170     if (inst) return;
171     inst = new TaskSchedulerImpl;
172     inst->threadCnt = threads;
173 }
174
175
176 void TaskScheduler::term()
177 {
178     if (!inst) return;
179     delete(inst);
180     inst = nullptr;
181 }
182
183
184 void TaskScheduler::request(shared_ptr<Task> task)
185 {
186     if (inst) {
187         inst->request(move(task));
188     }
189 }