Async handle only take one scanning in background
[platform/upstream/csr-framework.git] / src / framework / client / handle-ext.cpp
1 /*
2  *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License
15  */
16 /*
17  * @file        handle-ext.cpp
18  * @author      Kyungwook Tak (k.tak@samsung.com)
19  * @version     1.0
20  * @brief       handle with async request extension
21  */
22 #include "client/handle-ext.h"
23
24 #include <algorithm>
25
26 #include "client/utils.h"
27 #include "common/dispatcher.h"
28 #include "common/audit/logger.h"
29 #include "common/exception.h"
30
31 namespace Csr {
32 namespace Client {
33
34 HandleExt::HandleExt(SockId id, ContextShPtr &&context) :
35         Handle(id, std::move(context)),
36         m_stop(false)
37 {
38 }
39
40 HandleExt::~HandleExt()
41 {
42         DEBUG("Destroying extended handle... join all workers...");
43         eraseJoinableIf();
44 }
45
46 void HandleExt::stop()
47 {
48         DEBUG("Stop & join all workers...");
49         m_stop = true;
50         eraseJoinableIf();
51 }
52
53 bool HandleExt::isStopped() const noexcept
54 {
55         return m_stop.load();
56 }
57
58 bool HandleExt::hasRunning()
59 {
60         std::unique_lock<std::mutex> l(m_mutex);
61         auto it = m_workerMap.begin();
62
63         while (it != m_workerMap.end()) {
64                 if (!it->second.isDone.load())
65                         return true;
66         }
67
68         return false;
69 }
70
71 void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
72 {
73         std::unique_lock<std::mutex> l(m_mutex);
74         DEBUG("clean joinable workers! current worker map size: " <<
75                   m_workerMap.size());
76         auto it = m_workerMap.begin();
77
78         while (it != m_workerMap.end()) {
79                 DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
80
81                 if (!it->second.t.joinable())
82                         ThrowExc(InternalError, "All workers should be joinable but it isn't. "
83                                          "tid: " << it->first);
84
85                 if (!pred(*it)) {
86                         ++it;
87                         continue;
88                 }
89
90                 DEBUG("Joining worker! tid:" << it->first);
91                 l.unlock();
92                 it->second.t.join(); // release lock for worker who calls done()
93                 l.lock();
94                 DEBUG("Joined worker! tid:" << it->first);
95                 it = m_workerMap.erase(it);
96         }
97 }
98
99 void HandleExt::done()
100 {
101         std::lock_guard<std::mutex> l(m_mutex);
102         auto it = m_workerMap.find(std::this_thread::get_id());
103
104         if (it == m_workerMap.end())
105                 ThrowExc(InternalError, "worker done but it's not registered in map. "
106                                  "tid: " << std::this_thread::get_id());
107
108         it->second.isDone = true;
109 }
110
111 void HandleExt::dispatchAsync(const Task &f)
112 {
113         eraseJoinableIf([](const WorkerMapPair & pair) {
114                 return pair.second.isDone.load();
115         });
116         // TODO: how to handle exceptions in workers
117         std::thread t([this, f] {
118                 DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
119                 m_stop = false;
120
121                 f();
122                 done();
123
124                 DEBUG("client async thread done! tid: " << std::this_thread::get_id());
125         });
126
127         {
128                 std::lock_guard<std::mutex> l(m_mutex);
129                 m_workerMap.emplace(t.get_id(), std::move(t));
130         }
131 }
132
133 HandleExt::Worker::Worker() : isDone(false)
134 {
135         DEBUG("Worker default constructor called");
136 }
137
138 HandleExt::Worker::Worker(std::thread &&_t) :
139         isDone(false),
140         t(std::forward<std::thread>(_t))
141 {
142 }
143
144 HandleExt::Worker::Worker(HandleExt::Worker &&other) :
145         isDone(other.isDone.load()),
146         t(std::move(other.t))
147 {
148 }
149
150 HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
151 {
152         isDone = other.isDone.load();
153         t = std::move(other.t);
154         return *this;
155 }
156
157 void HandleExt::add(ResultPtr &&ptr)
158 {
159         std::lock_guard<std::mutex> l(m_resultsMutex);
160         m_results.emplace_back(std::forward<ResultPtr>(ptr));
161 }
162
163 void HandleExt::add(ResultListPtr &&ptr)
164 {
165         std::lock_guard<std::mutex> l(m_resultsMutex);
166         m_resultLists.emplace_back(std::forward<ResultListPtr>(ptr));
167 }
168
169 } // namespace Client
170 } // namespace Csr