2 * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License
17 * @file handle-ext.cpp
18 * @author Kyungwook Tak (k.tak@samsung.com)
20 * @brief handle with async request extension
22 #include "client/handle-ext.h"
26 #include "client/utils.h"
27 #include "common/dispatcher.h"
28 #include "common/audit/logger.h"
33 HandleExt::HandleExt() : m_stop(false)
37 HandleExt::~HandleExt()
39 DEBUG("Destroying extended handle... join all workers...");
43 void HandleExt::stop()
45 DEBUG("Stop & join all workers...");
50 bool HandleExt::isStopped() const
55 void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
57 std::unique_lock<std::mutex> l(m_mutex);
58 DEBUG("clean joinable workers! current worker map size: " <<
60 auto it = m_workerMap.begin();
62 while (it != m_workerMap.end()) {
63 DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
65 if (!it->second.t.joinable())
66 throw std::logic_error(FORMAT("All workers should be joinable "
67 "but it isn't. tid: " << it->first));
74 DEBUG("Joining worker! tid:" << it->first);
76 it->second.t.join(); // release lock for worker who calls done()
78 DEBUG("Joined worker! tid:" << it->first);
79 it = m_workerMap.erase(it);
83 void HandleExt::done()
85 std::lock_guard<std::mutex> l(m_mutex);
86 auto it = m_workerMap.find(std::this_thread::get_id());
88 if (it == m_workerMap.end())
89 throw std::logic_error(FORMAT("worker done but it's not registered in map. "
90 "tid: " << std::this_thread::get_id()));
92 it->second.isDone = true;
95 void HandleExt::dispatchAsync(const Task &f)
97 eraseJoinableIf([](const WorkerMapPair & pair) {
98 return pair.second.isDone.load();
100 // TODO: how to handle exceptions in workers
101 std::thread t([this, f] {
102 DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
107 DEBUG("client async thread done! tid: " << std::this_thread::get_id());
110 std::lock_guard<std::mutex> l(m_mutex);
111 m_workerMap.emplace(t.get_id(), std::move(t));
115 HandleExt::Worker::Worker() : isDone(false)
117 DEBUG("Worker default constructor called");
120 HandleExt::Worker::Worker(std::thread &&_t) :
122 t(std::forward<std::thread>(_t))
126 HandleExt::Worker::Worker(HandleExt::Worker &&other) :
127 isDone(other.isDone.load()),
128 t(std::move(other.t))
132 HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
134 isDone = other.isDone.load();
135 t = std::move(other.t);
139 } // namespace Client