2 * Copyright (c) 2020 Samsung Electronics Co., Ltd. All rights reserved.
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 #include "tvgCommon.h"
26 /************************************************************************/
27 /* Internal Class Implementation */
28 /************************************************************************/
33 deque<shared_ptr<Task>> taskDeque;
35 condition_variable ready;
38 bool tryPop(shared_ptr<Task> &task)
40 unique_lock<mutex> lock{mtx, try_to_lock};
41 if (!lock || taskDeque.empty()) return false;
42 task = move(taskDeque.front());
43 taskDeque.pop_front();
48 bool tryPush(shared_ptr<Task> &&task)
51 unique_lock<mutex> lock{mtx, try_to_lock};
52 if (!lock) return false;
53 taskDeque.push_back(move(task));
64 unique_lock<mutex> lock{mtx};
70 bool pop(shared_ptr<Task> &task)
72 unique_lock<mutex> lock{mtx};
74 while (taskDeque.empty() && !done) {
78 if (taskDeque.empty()) return false;
80 task = move(taskDeque.front());
81 taskDeque.pop_front();
86 void push(shared_ptr<Task> &&task)
89 unique_lock<mutex> lock{mtx};
90 taskDeque.push_back(move(task));
99 class TaskSchedulerImpl
103 vector<thread> threads;
104 vector<TaskQueue> taskQueues{threadCnt};
105 atomic<unsigned> idx{0};
109 for (unsigned i = 0; i < threadCnt; ++i) {
110 threads.emplace_back([&, i] { run(i); });
116 for (auto& queue : taskQueues) queue.complete();
117 for (auto& thread : threads) thread.join();
122 shared_ptr<Task> task;
126 auto success = false;
128 for (unsigned i = 0; i < threadCnt * 2; ++i) {
129 if (taskQueues[(i + i) % threadCnt].tryPop(task)) {
135 if (!success && !taskQueues[i].pop(task)) break;
141 void request(shared_ptr<Task> task)
147 for (unsigned n = 0; n < threadCnt; ++n) {
148 if (taskQueues[(i + n) % threadCnt].tryPush(move(task))) return;
151 taskQueues[i % threadCnt].push(move(task));
162 static TaskSchedulerImpl* inst = nullptr;
164 /************************************************************************/
165 /* External Class Implementation */
166 /************************************************************************/
168 void TaskScheduler::init(unsigned threads)
171 inst = new TaskSchedulerImpl;
172 inst->threadCnt = threads;
176 void TaskScheduler::term()
184 void TaskScheduler::request(shared_ptr<Task> task)
187 inst->request(move(task));