From 275c2f9d7901bbf5d44e1fa2cf3a4fce32f2fc8f Mon Sep 17 00:00:00 2001 From: Subhransu Mohanty Date: Thu, 13 Aug 2020 13:01:53 +0900 Subject: [PATCH] tvg: added task support that runs on a threadpool. this patch adds an async() function that takes a shared_task and runs asyncronously in a threadpool. Change-Id: I02a47df6938656828f924fbf5e2bc075073b329b --- src/lib/meson.build | 1 + src/lib/tvgTask.cpp | 156 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib/tvgTask.h | 72 ++++++++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 src/lib/tvgTask.cpp create mode 100644 src/lib/tvgTask.h diff --git a/src/lib/meson.build b/src/lib/meson.build index 631016d..503c35e 100644 --- a/src/lib/meson.build +++ b/src/lib/meson.build @@ -35,6 +35,7 @@ source_file = [ 'tvgScene.cpp', 'tvgShape.cpp', 'tvgSwCanvas.cpp', + 'tvgTask.cpp', ] common_dep = declare_dependency( diff --git a/src/lib/tvgTask.cpp b/src/lib/tvgTask.cpp new file mode 100644 index 0000000..81ce22e --- /dev/null +++ b/src/lib/tvgTask.cpp @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2020 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include + +template +class TaskQueue { + using lock_t = std::unique_lock; + std::deque _q; + bool _done{false}; + std::mutex _mutex; + std::condition_variable _ready; + +public: + bool try_pop(Task &task) + { + lock_t lock{_mutex, std::try_to_lock}; + if (!lock || _q.empty()) return false; + task = std::move(_q.front()); + _q.pop_front(); + return true; + } + + bool try_push(Task &&task) + { + { + lock_t lock{_mutex, std::try_to_lock}; + if (!lock) return false; + _q.push_back(std::move(task)); + } + _ready.notify_one(); + return true; + } + + void done() + { + { + lock_t lock{_mutex}; + _done = true; + } + _ready.notify_all(); + } + + bool pop(Task &task) + { + lock_t lock{_mutex}; + while (_q.empty() && !_done) _ready.wait(lock); + if (_q.empty()) return false; + task = std::move(_q.front()); + _q.pop_front(); + return true; + } + + void push(Task &&task) + { + { + lock_t lock{_mutex}; + _q.push_back(std::move(task)); + } + _ready.notify_one(); + } + +}; + +#include +#include + +namespace tvg +{ + +class Executor +{ + const unsigned _count{std::thread::hardware_concurrency()}; + std::vector _threads; + std::vector> _q{_count}; + std::atomic _index{0}; + void run(unsigned i) + { + // Task Loop + shared_task task; + while (true) { + bool success = false; + + for (unsigned n = 0; n != _count * 2; ++n) { + if (_q[(i + n) % _count].try_pop(task)) { + success = true; + break; + } + } + + if (!success && !_q[i].pop(task)) break; + + (*task)(); + } + } + + Executor() + { + for (unsigned n = 0; n != _count; ++n) { + _threads.emplace_back([&, n] { run(n); }); + } + } + ~Executor() + { + for (auto &e : _q) e.done(); + + for (auto &e : _threads) e.join(); + } + +public: + + static Executor& instance() { + static Executor singleton; + return singleton; + } + + void post(shared_task task) + { + task->prepare(); + + auto i = _index++; + + for (unsigned n = 0; n != _count; ++n) { + if (_q[(i + n) % _count].try_push(std::move(task))) return; + } + + if (_count > 0) { + _q[i % _count].push(std::move(task)); + } + } +}; + +void async(shared_task task) +{ + Executor::instance().post(std::move(task)); +} + +} + + diff --git a/src/lib/tvgTask.h b/src/lib/tvgTask.h new file mode 100644 index 0000000..9fb250e --- /dev/null +++ b/src/lib/tvgTask.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2020 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TVG_TASK_H_ +#define _TVG_TASK_H_ + +#include +#include + +namespace tvg +{ + +/* + Task Interface. + Able to run a task in the thread pool. derive from the + task interface and implement run method. + + To get the result call task->get() which will return immidiately if the + task is already finishd otherwise will wait till task completion. + */ + +class Task +{ +public: + virtual ~Task() = default; + void get() { if (_receiver.valid()) _receiver.get(); } + +protected: + virtual void run() = 0; +private: + void operator()() + { + run(); + _sender.set_value(); + } + void prepare() + { + _sender = std::promise(); + _receiver = _sender.get_future(); + } + friend class Executor; + + std::promise _sender; + std::future _receiver; +}; + + +using shared_task = std::shared_ptr; + +/* + async() function takes a shared task and runs it in + a thread pool asyncronously. call get() on the shared_task + to get the ressult out of the shared_task. + */ +void async(shared_task task); + +} + +#endif //_TVG_TASK_H_ \ No newline at end of file -- 2.7.4