From fa46459c838c537a51aaeef4c8b665442c8df24c Mon Sep 17 00:00:00 2001 From: Szymon Jastrzebski Date: Fri, 22 Jun 2018 11:24:08 +0200 Subject: [PATCH] [Common] Moving Filesystem Worker's implementation to common Change-Id: I0119cee796112d46bbb5e7eb8ebf9fa7042fc9ff Signed-off-by: Szymon Jastrzebski --- src/common/common.gyp | 2 + src/common/worker.cc | 100 ++++++++++++++++++++++++++ src/common/worker.h | 80 +++++++++++++++++++++ src/filesystem/filesystem_instance.cc | 81 --------------------- src/filesystem/filesystem_instance.h | 53 +------------- 5 files changed, 184 insertions(+), 132 deletions(-) create mode 100644 src/common/worker.cc create mode 100644 src/common/worker.h diff --git a/src/common/common.gyp b/src/common/common.gyp index be35db29..89f99a81 100644 --- a/src/common/common.gyp +++ b/src/common/common.gyp @@ -34,6 +34,8 @@ 'task-queue.h', 'tools.cc', 'tools.h', + 'worker.cc', + 'worker.h', 'optional.h', 'platform_result.cc', 'platform_result.h', diff --git a/src/common/worker.cc b/src/common/worker.cc new file mode 100644 index 00000000..4442c992 --- /dev/null +++ b/src/common/worker.cc @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "worker.h" + +namespace common { + +void Worker::main() { + std::unique_lock lck{jobs_mtx}; + while (true) { + jobs_cond.wait(lck, [this] { return !jobs.empty() || exit; }); + if (exit) { + return; + } + + while (!jobs.empty()) { + auto job = jobs.front(); + jobs.pop_front(); + //////////// end of critical section + lck.unlock(); + + try { + job.func(); + } catch (...) { + // should never happen + LoggerE("Func should never throw"); + } + + try { + job.finally(); + } catch (...) { + // should never happen + LoggerE("Finally should never throw"); + } + + lck.lock(); + //////////// start of critical section + if (exit) { + return; + } + } + } +} + +void Worker::add_job(const std::function& func, const std::function& finally) { + { + std::lock_guard lck{jobs_mtx}; + jobs.push_back({func, finally}); + } + jobs_cond.notify_one(); +} + +Worker::Worker() : exit(false), thread(std::bind(&Worker::main, this)) { +} + +Worker::~Worker() { + if (!exit && thread.joinable()) { + stop(); + } +} + +void Worker::stop() { + { + // use memory barrier for exit flag (could be std::atomic_flag, but we use lock instead) + std::lock_guard lck{jobs_mtx}; + exit = true; + } + jobs_cond.notify_one(); + + try { + thread.join(); + } catch (std::exception& e) { + LoggerE("Failed to join thread: %s", e.what()); + } + + // finalize jobs left in queue + for (auto job : jobs) { + try { + job.finally(); + } catch (...) { + // should never happen + LoggerE("Finally should never throw"); + } + } +} + +} // namespace common \ No newline at end of file diff --git a/src/common/worker.h b/src/common/worker.h new file mode 100644 index 00000000..85aea8b9 --- /dev/null +++ b/src/common/worker.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef WEBAPI_PLUGINS_COMMON_WORKER_H_ +#define WEBAPI_PLUGINS_COMMON_WORKER_H_ + +#include +#include +#include +#include +#include +#include +#include "common/logger.h" + +namespace common { + +/** + * @brief Implements single worker executing in new thread + * + * Jobs are done in FIFO order. If this worker is destroyed all pending jobs are cancelled, + * and all remaining 'finally' functions are called. + */ +class Worker { + private: + friend class TaskQueue; + bool exit; + struct Job { + std::function func; + std::function finally; + }; + std::mutex jobs_mtx; + std::condition_variable jobs_cond; + std::deque jobs; + std::thread thread; + void main(void); + + public: + Worker(); + virtual ~Worker(); + void stop(); + + /** + * @brief Schedule a job + * Parameters will be copied (no reference is held) + * + * @param job function called as a job (should not throw) + * @param finally function called after completion or canceling. (should not throw) + */ + virtual void add_job(const std::function& job, const std::function& finally); + + /** + * @brief Schedule a job. Same as above, but with empty finally function. + * Parameter will be copied (no reference is held) + * + * @param job function called as a job (should not throw) + */ + virtual void add_job(const std::function& job) { + add_job(job, [] {}); + } + + Worker(const Worker&) = delete; + Worker& operator=(const Worker&) = delete; +}; + +} // namespace common + +#endif // WEBAPI_PLUGINS_COMMON_WORKER_H_ \ No newline at end of file diff --git a/src/filesystem/filesystem_instance.cc b/src/filesystem/filesystem_instance.cc index e297ce51..a8ed1fd3 100644 --- a/src/filesystem/filesystem_instance.cc +++ b/src/filesystem/filesystem_instance.cc @@ -97,87 +97,6 @@ FileHandle::~FileHandle() { } } -void FilesystemInstance::Worker::main() { - std::unique_lock lck{jobs_mtx}; - while (true) { - jobs_cond.wait(lck, [this] { return !jobs.empty() || exit; }); - if (exit) { - return; - } - - while (!jobs.empty()) { - auto job = jobs.front(); - jobs.pop_front(); - //////////// end of critical section - lck.unlock(); - - try { - job.func(); - } catch (...) { - // should never happen - LoggerE("Func should never throw"); - } - - try { - job.finally(); - } catch (...) { - // should never happen - LoggerE("Finally should never throw"); - } - - lck.lock(); - //////////// start of critical section - if (exit) { - return; - } - } - } -} - -void FilesystemInstance::Worker::add_job(const std::function& func, - const std::function& finally) { - { - std::lock_guard lck{jobs_mtx}; - jobs.push_back({func, finally}); - } - jobs_cond.notify_one(); -} - -FilesystemInstance::Worker::Worker() - : exit(false), thread(std::bind(&FilesystemInstance::Worker::main, this)) { -} - -FilesystemInstance::Worker::~Worker() { - if (!exit && thread.joinable()) { - stop(); - } -} - -void FilesystemInstance::Worker::stop() { - { - // use memory barrier for exit flag (could be std::atomic_flag, but we use lock instead) - std::lock_guard lck{jobs_mtx}; - exit = true; - } - jobs_cond.notify_one(); - - try { - thread.join(); - } catch (std::exception& e) { - LoggerE("Failed to join thread: %s", e.what()); - } - - // finalize jobs left in queue - for (auto job : jobs) { - try { - job.finally(); - } catch (...) { - // should never happen - LoggerE("Finally should never throw"); - } - } -} - FilesystemInstance::FilesystemInstance() { ScopeLogger(); diff --git a/src/filesystem/filesystem_instance.h b/src/filesystem/filesystem_instance.h index 617fab3a..2de2da3b 100644 --- a/src/filesystem/filesystem_instance.h +++ b/src/filesystem/filesystem_instance.h @@ -21,15 +21,10 @@ #include #include #include -#include #include -#include -#include -#include -#include -#include #include "common/extension.h" #include "common/filesystem/filesystem_storage.h" +#include "common/worker.h" #include "filesystem/filesystem_manager.h" #include "filesystem_utils.h" @@ -60,51 +55,7 @@ class FilesystemInstance : public common::ParsedInstance, FilesystemStateChangeL private: FileHandleMap opened_files; - - /** - * @brief Implements single worker executing in new thread - * - * Jobs are done in FIFO order. If this worker is destroyed all pending jobs are cancelled, - * and all remaining 'finally' functions are called. - */ - class Worker { - bool exit; - struct Job { - std::function func; - std::function finally; - }; - std::mutex jobs_mtx; - std::condition_variable jobs_cond; - std::deque jobs; - std::thread thread; - void main(void); - - public: - void stop(); - Worker(); - ~Worker(); - - /** - * @brief Schedule a job - * Parameters will be copied (no reference is held) - * - * @param job function called as a job (should not throw) - * @param finally function called after completion or canceling. (should not throw) - */ - void add_job(const std::function& job, const std::function& finally); - - /** - * @brief Schedule a job. Same as above, but with empty finally function. - * Parameter will be copied (no reference is held) - * - * @param job function called as a job (should not throw) - */ - void add_job(const std::function& job) { - add_job(job, [] {}); - } - }; - - Worker worker; + common::Worker worker; void FileCreateSync(const picojson::value& args, picojson::object& out); void FileRename(const picojson::value& args, picojson::object& out); -- 2.34.1