Add a lock at channel
[platform/core/system/sensord.git] / src / shared / channel.cpp
1 /*
2  * sensord
3  *
4  * Copyright (c) 2017 Samsung Electronics Co., Ltd.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  */
19
20 #include "channel.h"
21
22 #include <stdint.h>
23 #include <unistd.h>
24 #include <memory>
25 #include <algorithm>
26
27 #include "sensor_log.h"
28 #include "channel_event_handler.h"
29
30 #define SYSTEMD_SOCK_BUF_SIZE (128*1024)
31
32 using namespace ipc;
33
34 class send_event_handler : public event_handler
35 {
36 public:
37         send_event_handler(channel *ch, std::shared_ptr<message> msg)
38         : m_ch(ch)
39         , m_msg(msg)
40         { }
41
42         bool handle(int fd, event_condition condition)
43         {
44                 if (!m_ch) {
45                         return false;
46                 }
47
48                 m_ch->remove_pending_event_id(m_event_id);
49
50                 if (!m_ch->is_connected()) {
51                         return false;
52                 }
53
54                 if (condition & (EVENT_IN | EVENT_HUP)) {
55                         return false;
56                 }
57
58                 if (!m_ch->send_sync(*m_msg)) {
59                         return false;
60                 }
61
62                 return false;
63         }
64
65 private:
66         channel *m_ch;
67         std::shared_ptr<message> m_msg;
68 };
69
70 class read_event_handler : public event_handler
71 {
72 public:
73         read_event_handler(channel *ch)
74         : m_ch(ch)
75         { }
76
77         bool handle(int fd, event_condition condition)
78         {
79                 if (!m_ch) {
80                         return false;
81                 }
82
83                 m_ch->remove_pending_event_id(m_event_id);
84
85                 if (!m_ch->is_connected()) {
86                         return false;
87                 }
88
89                 if (condition & (EVENT_OUT | EVENT_HUP)) {
90                         return false;
91                 }
92
93                 message msg;
94                 if (!m_ch->read_sync(msg, false)) {
95                         return false;
96                 }
97
98                 return false;
99         }
100
101 private:
102         channel *m_ch;
103 };
104
105 channel::channel(socket *sock)
106 : m_fd(sock->get_fd())
107 , m_event_id(0)
108 , m_socket(sock)
109 , m_handler(NULL)
110 , m_loop(NULL)
111 , m_connected(false)
112 {
113         _D("Created");
114 }
115
116 channel::~channel()
117 {
118         _D("Destroyed[%llu]", m_event_id);
119         if (is_connected()) {
120                 disconnect();
121         }
122 }
123
124 uint64_t channel::bind(void)
125 {
126         retv_if(!m_loop, 0);
127         m_event_id = m_loop->add_event(m_socket->get_fd(),
128                         (EVENT_IN | EVENT_HUP | EVENT_NVAL),
129                         dynamic_cast<channel_event_handler *>(m_handler));
130
131         _D("Bound[%llu]", m_event_id);
132         return m_event_id;
133 }
134
135 uint64_t channel::bind(channel_handler *handler, event_loop *loop, bool loop_bind)
136 {
137         m_handler = handler;
138         m_loop = loop;
139         m_connected.store(true);
140
141         if (m_handler)
142                 m_handler->connected(this);
143
144         if (loop_bind)
145                 bind();
146
147         return m_event_id;
148 }
149
150 uint64_t channel::connect(channel_handler *handler, event_loop *loop, bool loop_bind)
151 {
152         if (!m_socket->connect())
153                 return false;
154
155         bind(handler, loop, loop_bind);
156
157         _D("Connected[%llu]", m_event_id);
158         return m_event_id;
159 }
160
161 void channel::disconnect(void)
162 {
163         AUTOLOCK(m_cmutex);
164         if (!is_connected()) {
165                 _D("Channel is not connected");
166                 return;
167         }
168
169         m_connected.store(false);
170
171         _D("Disconnecting..[%llu]", m_event_id);
172
173         if (m_handler) {
174                 m_handler->disconnected(this);
175                 m_handler = NULL;
176         }
177
178         if (m_loop) {
179                 for(auto id : m_pending_event_id) {
180                         _D("Remove pending event id[%llu]", id);
181                         m_loop->remove_event(id, true);
182                 }
183                 _D("Remove event[%llu]", m_event_id);
184                 m_loop->remove_event(m_event_id, true);
185                 m_loop = NULL;
186                 m_event_id = 0;
187         }
188
189         if (m_socket) {
190                 _D("Release socket[%d]", m_socket->get_fd());
191                 delete m_socket;
192                 m_socket = NULL;
193         }
194
195         _D("Disconnected");
196 }
197
198 bool channel::send(std::shared_ptr<message> msg)
199 {
200         int retry_cnt = 0;
201         int cur_buffer_size = 0;
202
203         retv_if(!m_loop, false);
204
205         while (retry_cnt < 3) {
206                 cur_buffer_size = m_socket->get_current_buffer_size();
207                 if (cur_buffer_size <= SYSTEMD_SOCK_BUF_SIZE)
208                         break;
209                 usleep(3000);
210                 retry_cnt++;
211         }
212         retvm_if(retry_cnt >= 3, false, "Socket buffer[%d] is exceeded", cur_buffer_size);
213
214         send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg);
215         retvm_if(!handler, false, "Failed to allocate memory");
216
217         uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_OUT | EVENT_HUP | EVENT_NVAL), handler);
218         if (event_id == 0) {
219                 _D("Failed to add send event handler");
220                 delete handler;
221                 return false;
222         }
223
224         m_pending_event_id.push_back(event_id);
225         return true;
226 }
227
228 bool channel::send_sync(message &msg)
229 {
230         AUTOLOCK(m_cmutex);
231         if (!is_connected()) {
232                 _D("Channel is not connected");
233                 return false;
234         }
235
236         retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size());
237
238         ssize_t size = 0;
239         char *buf = msg.body();
240
241         /* header */
242         size = m_socket->send(reinterpret_cast<void *>(msg.header()),
243             sizeof(message_header), true);
244         retvm_if(size <= 0, false, "Failed to send header");
245
246         /* if body size is zero, skip to send body message */
247         retv_if(msg.size() == 0, true);
248
249         /* body */
250         size = m_socket->send(buf, msg.size(), true);
251         retvm_if(size <= 0, false, "Failed to send body");
252
253         return true;
254 }
255
256 bool channel::read(void)
257 {
258         retv_if(!m_loop, false);
259
260         read_event_handler *handler = new(std::nothrow) read_event_handler(this);
261         retvm_if(!handler, false, "Failed to allocate memory");
262
263         uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
264         if (event_id == 0) {
265                 _D("Failed to add read event handler");
266                 delete handler;
267                 return false;
268         }
269
270         m_pending_event_id.push_back(event_id);
271         return true;
272 }
273
274 bool channel::read_sync(message &msg, bool select)
275 {
276         AUTOLOCK(m_cmutex);
277         if (!is_connected()) {
278                 _D("Channel is not connected");
279                 return false;
280         }
281
282         message_header header;
283         ssize_t size = 0;
284         char buf[MAX_MSG_CAPACITY];
285
286         /* header */
287         size = m_socket->recv(&header, sizeof(message_header), select);
288         retv_if(size <= 0, false);
289
290         /* check error from header */
291         if (m_handler && header.err != 0) {
292                 m_handler->error_caught(this, header.err);
293                 msg.header()->err = header.err;
294                 return true;
295         }
296
297         /* body */
298         if (header.length >= MAX_MSG_CAPACITY) {
299                 _E("header.length error %u", header.length);
300                 return false;
301         }
302
303         if (header.length > 0) {
304                 size = m_socket->recv(&buf, header.length, select);
305                 retv_if(size <= 0, false);
306         }
307
308         buf[header.length] = '\0';
309         msg.enclose(reinterpret_cast<const void *>(buf), header.length);
310         msg.set_type(header.type);
311         msg.header()->err = header.err;
312
313         if (m_handler)
314                 m_handler->read(this, msg);
315
316         return true;
317 }
318
319 bool channel::is_connected(void)
320 {
321         return m_connected.load();
322 }
323
324 bool channel::set_option(int type, int value)
325 {
326         switch (type) {
327         case SO_SNDBUF:
328                 m_socket->set_buffer_size(type, value);
329                 break;
330         case SO_RCVBUF:
331                 m_socket->set_buffer_size(type, value);
332                 break;
333         default:
334                 break;
335         }
336
337         return true;
338 }
339
340 bool channel::get_option(int type, int &value) const
341 {
342         switch (type) {
343         case 0:
344                 value = m_socket->get_current_buffer_size();
345                 break;
346         case SO_SNDBUF:
347                 value = m_socket->get_buffer_size(type);
348                 break;
349         case SO_RCVBUF:
350                 value = m_socket->get_buffer_size(type);
351                 break;
352         default:
353                 break;
354         }
355
356         return true;
357 }
358
359 int channel::get_fd(void) const
360 {
361         return m_fd;
362 }
363
364 void channel::remove_pending_event_id(uint64_t id)
365 {
366         auto it = std::find(m_pending_event_id.begin(), m_pending_event_id.end(), id);
367         if (it != m_pending_event_id.end()) {
368                 m_pending_event_id.erase(it);
369         }
370 }