Async client stub initial commit
[platform/upstream/csr-framework.git] / src / framework / client / handle-ext.cpp
1 /*
2  *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License
15  */
16 /*
17  * @file        handle-ext.cpp
18  * @author      Kyungwook Tak (k.tak@samsung.com)
19  * @version     1.0
20  * @brief       handle with async request extension
21  */
22 #include "client/handle-ext.h"
23
24 #include <algorithm>
25
26 #include "client/utils.h"
27 #include "common/dispatcher.h"
28 #include "common/audit/logger.h"
29
30 namespace Csr {
31 namespace Client {
32
33 HandleExt::HandleExt() : m_stop(false)
34 {
35 }
36
37 HandleExt::~HandleExt()
38 {
39         DEBUG("Destroying extended handle... join all workers...");
40         eraseJoinableIf();
41 }
42
43 void HandleExt::stop()
44 {
45         DEBUG("Stop & join all workers...");
46         m_stop = true;
47         eraseJoinableIf();
48 }
49
50 bool HandleExt::isStopped() const
51 {
52         return m_stop.load();
53 }
54
55 void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
56 {
57         std::unique_lock<std::mutex> l(m_mutex);
58         DEBUG("clean joinable workers! current worker map size: " <<
59                   m_workerMap.size());
60         auto it = m_workerMap.begin();
61
62         while (it != m_workerMap.end()) {
63                 DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
64
65                 if (!it->second.t.joinable())
66                         throw std::logic_error(FORMAT("All workers should be joinable "
67                                                                                   "but it isn't. tid: " << it->first));
68
69                 if (!pred(*it)) {
70                         ++it;
71                         continue;
72                 }
73
74                 DEBUG("Joining worker! tid:" << it->first);
75                 l.unlock();
76                 it->second.t.join(); // release lock for worker who calls done()
77                 l.lock();
78                 DEBUG("Joined worker! tid:" << it->first);
79                 it = m_workerMap.erase(it);
80         }
81 }
82
83 void HandleExt::done()
84 {
85         std::lock_guard<std::mutex> l(m_mutex);
86         auto it = m_workerMap.find(std::this_thread::get_id());
87
88         if (it == m_workerMap.end())
89                 throw std::logic_error(FORMAT("worker done but it's not registered in map. "
90                                                                           "tid: " << std::this_thread::get_id()));
91
92         it->second.isDone = true;
93 }
94
95 void HandleExt::dispatchAsync(const Task &f)
96 {
97         eraseJoinableIf([](const WorkerMapPair & pair) {
98                 return pair.second.isDone.load();
99         });
100         // TODO: how to handle exceptions in workers
101         std::thread t([this, f] {
102                 DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
103
104                 f();
105                 done();
106
107                 DEBUG("client async thread done! tid: " << std::this_thread::get_id());
108         });
109         {
110                 std::lock_guard<std::mutex> l(m_mutex);
111                 m_workerMap.emplace(t.get_id(), std::move(t));
112         }
113 }
114
115 HandleExt::Worker::Worker() : isDone(false)
116 {
117         DEBUG("Worker default constructor called");
118 }
119
120 HandleExt::Worker::Worker(std::thread &&_t) :
121         isDone(false),
122         t(std::forward<std::thread>(_t))
123 {
124 }
125
126 HandleExt::Worker::Worker(HandleExt::Worker &&other) :
127         isDone(other.isDone.load()),
128         t(std::move(other.t))
129 {
130 }
131
132 HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
133 {
134         isDone = other.isDone.load();
135         t = std::move(other.t);
136         return *this;
137 }
138
139 } // namespace Client
140 } // namespace Csr