Use socket per module (CS, WP, ADMIN)
[platform/upstream/csr-framework.git] / src / framework / client / handle-ext.cpp
1 /*
2  *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License
15  */
16 /*
17  * @file        handle-ext.cpp
18  * @author      Kyungwook Tak (k.tak@samsung.com)
19  * @version     1.0
20  * @brief       handle with async request extension
21  */
22 #include "client/handle-ext.h"
23
24 #include <algorithm>
25
26 #include "client/utils.h"
27 #include "common/dispatcher.h"
28 #include "common/audit/logger.h"
29 #include "common/exception.h"
30
31 namespace Csr {
32 namespace Client {
33
34 HandleExt::HandleExt(SockId id, ContextShPtr &&context) :
35         Handle(id, std::move(context)),
36         m_stop(false)
37 {
38 }
39
40 HandleExt::~HandleExt()
41 {
42         DEBUG("Destroying extended handle... join all workers...");
43         eraseJoinableIf();
44 }
45
46 void HandleExt::stop()
47 {
48         DEBUG("Stop & join all workers...");
49         m_stop = true;
50         eraseJoinableIf();
51 }
52
53 bool HandleExt::isStopped() const
54 {
55         return m_stop.load();
56 }
57
58 void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
59 {
60         std::unique_lock<std::mutex> l(m_mutex);
61         DEBUG("clean joinable workers! current worker map size: " <<
62                   m_workerMap.size());
63         auto it = m_workerMap.begin();
64
65         while (it != m_workerMap.end()) {
66                 DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
67
68                 if (!it->second.t.joinable())
69                         ThrowExc(InternalError, "All workers should be joinable but it isn't. "
70                                          "tid: " << it->first);
71
72                 if (!pred(*it)) {
73                         ++it;
74                         continue;
75                 }
76
77                 DEBUG("Joining worker! tid:" << it->first);
78                 l.unlock();
79                 it->second.t.join(); // release lock for worker who calls done()
80                 l.lock();
81                 DEBUG("Joined worker! tid:" << it->first);
82                 it = m_workerMap.erase(it);
83         }
84 }
85
86 void HandleExt::done()
87 {
88         std::lock_guard<std::mutex> l(m_mutex);
89         auto it = m_workerMap.find(std::this_thread::get_id());
90
91         if (it == m_workerMap.end())
92                 ThrowExc(InternalError, "worker done but it's not registered in map. "
93                                  "tid: " << std::this_thread::get_id());
94
95         it->second.isDone = true;
96 }
97
98 void HandleExt::dispatchAsync(const Task &f)
99 {
100         eraseJoinableIf([](const WorkerMapPair & pair) {
101                 return pair.second.isDone.load();
102         });
103         // TODO: how to handle exceptions in workers
104         std::thread t([this, f] {
105                 DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
106
107                 f();
108                 done();
109
110                 DEBUG("client async thread done! tid: " << std::this_thread::get_id());
111         });
112
113         {
114                 std::lock_guard<std::mutex> l(m_mutex);
115                 m_workerMap.emplace(t.get_id(), std::move(t));
116         }
117 }
118
119 HandleExt::Worker::Worker() : isDone(false)
120 {
121         DEBUG("Worker default constructor called");
122 }
123
124 HandleExt::Worker::Worker(std::thread &&_t) :
125         isDone(false),
126         t(std::forward<std::thread>(_t))
127 {
128 }
129
130 HandleExt::Worker::Worker(HandleExt::Worker &&other) :
131         isDone(other.isDone.load()),
132         t(std::move(other.t))
133 {
134 }
135
136 HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
137 {
138         isDone = other.isDone.load();
139         t = std::move(other.t);
140         return *this;
141 }
142
143 void HandleExt::add(ResultPtr &&ptr)
144 {
145         std::lock_guard<std::mutex> l(m_resultsMutex);
146         m_results.emplace_back(std::forward<ResultPtr>(ptr));
147 }
148
149 void HandleExt::add(ResultListPtr &&ptr)
150 {
151         std::lock_guard<std::mutex> l(m_resultsMutex);
152         m_resultLists.emplace_back(std::forward<ResultListPtr>(ptr));
153 }
154
155 } // namespace Client
156 } // namespace Csr