2 * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License
17 * @file handle-ext.cpp
18 * @author Kyungwook Tak (k.tak@samsung.com)
20 * @brief handle with async request extension
22 #include "client/handle-ext.h"
26 #include "client/utils.h"
27 #include "common/dispatcher.h"
28 #include "common/audit/logger.h"
29 #include "common/exception.h"
34 HandleExt::HandleExt(SockId id, ContextShPtr &&context) :
35 Handle(id, std::move(context)),
40 HandleExt::~HandleExt()
42 DEBUG("Destroying extended handle... join all workers...");
46 void HandleExt::stop()
48 DEBUG("Stop & join all workers...");
53 bool HandleExt::isStopped() const noexcept
58 bool HandleExt::hasRunning()
60 std::unique_lock<std::mutex> l(m_mutex);
61 auto it = m_workerMap.begin();
63 while (it != m_workerMap.end()) {
64 if (!it->second.isDone.load())
71 void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
73 std::unique_lock<std::mutex> l(m_mutex);
74 DEBUG("clean joinable workers! current worker map size: " <<
76 auto it = m_workerMap.begin();
78 while (it != m_workerMap.end()) {
79 DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
81 if (!it->second.t.joinable())
82 ThrowExc(InternalError, "All workers should be joinable but it isn't. "
83 "tid: " << it->first);
90 DEBUG("Joining worker! tid:" << it->first);
92 it->second.t.join(); // release lock for worker who calls done()
94 DEBUG("Joined worker! tid:" << it->first);
95 it = m_workerMap.erase(it);
99 void HandleExt::done()
101 std::lock_guard<std::mutex> l(m_mutex);
102 auto it = m_workerMap.find(std::this_thread::get_id());
104 if (it == m_workerMap.end())
105 ThrowExc(InternalError, "worker done but it's not registered in map. "
106 "tid: " << std::this_thread::get_id());
108 it->second.isDone = true;
111 void HandleExt::dispatchAsync(const Task &f)
113 eraseJoinableIf([](const WorkerMapPair & pair) {
114 return pair.second.isDone.load();
116 // TODO: how to handle exceptions in workers
117 std::thread t([this, f] {
118 DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
123 DEBUG("client async thread done! tid: " << std::this_thread::get_id());
127 std::lock_guard<std::mutex> l(m_mutex);
128 m_workerMap.emplace(t.get_id(), std::move(t));
132 HandleExt::Worker::Worker() : isDone(false)
134 DEBUG("Worker default constructor called");
137 HandleExt::Worker::Worker(std::thread &&_t) :
139 t(std::forward<std::thread>(_t))
143 HandleExt::Worker::Worker(HandleExt::Worker &&other) :
144 isDone(other.isDone.load()),
145 t(std::move(other.t))
149 HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
151 isDone = other.isDone.load();
152 t = std::move(other.t);
156 void HandleExt::add(ResultPtr &&ptr)
158 std::lock_guard<std::mutex> l(m_resultsMutex);
159 m_results.emplace_back(std::forward<ResultPtr>(ptr));
162 void HandleExt::add(ResultListPtr &&ptr)
164 std::lock_guard<std::mutex> l(m_resultsMutex);
165 m_resultLists.emplace_back(std::forward<ResultListPtr>(ptr));
168 } // namespace Client