2 * Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License
17 * @file handle-ext.cpp
18 * @author Kyungwook Tak (k.tak@samsung.com)
20 * @brief handle with async request extension
22 #include "client/handle-ext.h"
26 #include "client/utils.h"
27 #include "common/dispatcher.h"
28 #include "common/audit/logger.h"
33 HandleExt::HandleExt(ContextShPtr &&context) :
34 Handle(std::move(context)),
39 HandleExt::~HandleExt()
41 DEBUG("Destroying extended handle... join all workers...");
45 void HandleExt::stop()
47 DEBUG("Stop & join all workers...");
52 bool HandleExt::isStopped() const
57 void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
59 std::unique_lock<std::mutex> l(m_mutex);
60 DEBUG("clean joinable workers! current worker map size: " <<
62 auto it = m_workerMap.begin();
64 while (it != m_workerMap.end()) {
65 DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
67 if (!it->second.t.joinable())
68 throw std::logic_error(FORMAT("All workers should be joinable "
69 "but it isn't. tid: " << it->first));
76 DEBUG("Joining worker! tid:" << it->first);
78 it->second.t.join(); // release lock for worker who calls done()
80 DEBUG("Joined worker! tid:" << it->first);
81 it = m_workerMap.erase(it);
85 void HandleExt::done()
87 std::lock_guard<std::mutex> l(m_mutex);
88 auto it = m_workerMap.find(std::this_thread::get_id());
90 if (it == m_workerMap.end())
91 throw std::logic_error(FORMAT("worker done but it's not registered in map. "
92 "tid: " << std::this_thread::get_id()));
94 it->second.isDone = true;
97 void HandleExt::dispatchAsync(const Task &f)
99 eraseJoinableIf([](const WorkerMapPair & pair) {
100 return pair.second.isDone.load();
102 // TODO: how to handle exceptions in workers
103 std::thread t([this, f] {
104 DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
109 DEBUG("client async thread done! tid: " << std::this_thread::get_id());
113 std::lock_guard<std::mutex> l(m_mutex);
114 m_workerMap.emplace(t.get_id(), std::move(t));
118 HandleExt::Worker::Worker() : isDone(false)
120 DEBUG("Worker default constructor called");
123 HandleExt::Worker::Worker(std::thread &&_t) :
125 t(std::forward<std::thread>(_t))
129 HandleExt::Worker::Worker(HandleExt::Worker &&other) :
130 isDone(other.isDone.load()),
131 t(std::move(other.t))
135 HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
137 isDone = other.isDone.load();
138 t = std::move(other.t);
142 void HandleExt::add(ResultPtr &&ptr)
144 std::lock_guard<std::mutex> l(m_resultsMutex);
145 m_results.emplace_back(std::forward<ResultPtr>(ptr));
148 void HandleExt::add(ResultListPtr &&ptr)
150 std::lock_guard<std::mutex> l(m_resultsMutex);
151 m_resultLists.emplace_back(std::forward<ResultListPtr>(ptr));
154 } // namespace Client