2 * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License
17 * @file handle-ext.cpp
18 * @author Kyungwook Tak (k.tak@samsung.com)
20 * @brief handle with async request extension
22 #include "client/handle-ext.h"
26 #include "client/utils.h"
27 #include "common/dispatcher.h"
28 #include "common/audit/logger.h"
29 #include "common/exception.h"
34 HandleExt::HandleExt(SockId id, ContextShPtr &&context) :
35 Handle(id, std::move(context)),
40 HandleExt::~HandleExt()
42 DEBUG("Destroying extended handle... join all workers...");
46 void HandleExt::stop()
48 DEBUG("Stop & join all workers...");
53 bool HandleExt::isStopped() const
58 void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
60 std::unique_lock<std::mutex> l(m_mutex);
61 DEBUG("clean joinable workers! current worker map size: " <<
63 auto it = m_workerMap.begin();
65 while (it != m_workerMap.end()) {
66 DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
68 if (!it->second.t.joinable())
69 ThrowExc(InternalError, "All workers should be joinable but it isn't. "
70 "tid: " << it->first);
77 DEBUG("Joining worker! tid:" << it->first);
79 it->second.t.join(); // release lock for worker who calls done()
81 DEBUG("Joined worker! tid:" << it->first);
82 it = m_workerMap.erase(it);
86 void HandleExt::done()
88 std::lock_guard<std::mutex> l(m_mutex);
89 auto it = m_workerMap.find(std::this_thread::get_id());
91 if (it == m_workerMap.end())
92 ThrowExc(InternalError, "worker done but it's not registered in map. "
93 "tid: " << std::this_thread::get_id());
95 it->second.isDone = true;
98 void HandleExt::dispatchAsync(const Task &f)
100 eraseJoinableIf([](const WorkerMapPair & pair) {
101 return pair.second.isDone.load();
103 // TODO: how to handle exceptions in workers
104 std::thread t([this, f] {
105 DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
110 DEBUG("client async thread done! tid: " << std::this_thread::get_id());
114 std::lock_guard<std::mutex> l(m_mutex);
115 m_workerMap.emplace(t.get_id(), std::move(t));
119 HandleExt::Worker::Worker() : isDone(false)
121 DEBUG("Worker default constructor called");
124 HandleExt::Worker::Worker(std::thread &&_t) :
126 t(std::forward<std::thread>(_t))
130 HandleExt::Worker::Worker(HandleExt::Worker &&other) :
131 isDone(other.isDone.load()),
132 t(std::move(other.t))
136 HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
138 isDone = other.isDone.load();
139 t = std::move(other.t);
143 void HandleExt::add(ResultPtr &&ptr)
145 std::lock_guard<std::mutex> l(m_resultsMutex);
146 m_results.emplace_back(std::forward<ResultPtr>(ptr));
149 void HandleExt::add(ResultListPtr &&ptr)
151 std::lock_guard<std::mutex> l(m_resultsMutex);
152 m_resultLists.emplace_back(std::forward<ResultListPtr>(ptr));
155 } // namespace Client