Delete pending events when disconnecting channel
[platform/core/system/sensord.git] / src / shared / channel.cpp
1 /*
2  * sensord
3  *
4  * Copyright (c) 2017 Samsung Electronics Co., Ltd.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  */
19
20 #include "channel.h"
21
22 #include <stdint.h>
23 #include <unistd.h>
24 #include <memory>
25 #include <algorithm>
26
27 #include "sensor_log.h"
28 #include "channel_event_handler.h"
29
30 #define SYSTEMD_SOCK_BUF_SIZE (128*1024)
31
32 using namespace ipc;
33
34 class send_event_handler : public event_handler
35 {
36 public:
37         send_event_handler(channel *ch, std::shared_ptr<message> msg)
38         : m_ch(ch)
39         , m_msg(msg)
40         { }
41
42         bool handle(int fd, event_condition condition)
43         {
44                 if (!m_ch) {
45                         return false;
46                 }
47
48                 m_ch->remove_pending_event_id(m_event_id);
49
50                 if (!m_ch->is_connected()) {
51                         return false;
52                 }
53
54                 if (condition & (EVENT_IN | EVENT_HUP)) {
55                         return false;
56                 }
57
58                 if (!m_ch->send_sync(*m_msg)) {
59                         return false;
60                 }
61
62                 return false;
63         }
64
65 private:
66         channel *m_ch;
67         std::shared_ptr<message> m_msg;
68 };
69
70 class read_event_handler : public event_handler
71 {
72 public:
73         read_event_handler(channel *ch)
74         : m_ch(ch)
75         { }
76
77         bool handle(int fd, event_condition condition)
78         {
79                 if (!m_ch) {
80                         return false;
81                 }
82
83                 m_ch->remove_pending_event_id(m_event_id);
84
85                 if (!m_ch->is_connected()) {
86                         return false;
87                 }
88
89                 if (condition & (EVENT_OUT | EVENT_HUP)) {
90                         return false;
91                 }
92
93                 message msg;
94                 if (!m_ch->read_sync(msg, false)) {
95                         return false;
96                 }
97
98                 return false;
99         }
100
101 private:
102         channel *m_ch;
103 };
104
105 channel::channel(socket *sock)
106 : m_fd(sock->get_fd())
107 , m_event_id(0)
108 , m_socket(sock)
109 , m_handler(NULL)
110 , m_loop(NULL)
111 , m_connected(false)
112 {
113         _D("Created");
114 }
115
116 channel::~channel()
117 {
118         _D("Destroyed[%llu]", m_event_id);
119         disconnect();
120 }
121
122 uint64_t channel::bind(void)
123 {
124         retv_if(!m_loop, 0);
125         m_event_id = m_loop->add_event(m_socket->get_fd(),
126                         (EVENT_IN | EVENT_HUP | EVENT_NVAL),
127                         dynamic_cast<channel_event_handler *>(m_handler));
128
129         _D("Bound[%llu]", m_event_id);
130         return m_event_id;
131 }
132
133 uint64_t channel::bind(channel_handler *handler, event_loop *loop, bool loop_bind)
134 {
135         m_handler = handler;
136         m_loop = loop;
137         m_connected.store(true);
138
139         if (m_handler)
140                 m_handler->connected(this);
141
142         if (loop_bind)
143                 bind();
144
145         return m_event_id;
146 }
147
148 uint64_t channel::connect(channel_handler *handler, event_loop *loop, bool loop_bind)
149 {
150         if (!m_socket->connect())
151                 return false;
152
153         bind(handler, loop, loop_bind);
154
155         _D("Connected[%llu]", m_event_id);
156         return m_event_id;
157 }
158
159 void channel::disconnect(void)
160 {
161         if (!is_connected()) {
162                 _D("Channel is not connected");
163                 return;
164         }
165
166         m_connected.store(false);
167
168         _D("Disconnecting..[%llu]", m_event_id);
169
170         if (m_handler) {
171                 m_handler->disconnected(this);
172                 m_handler = NULL;
173         }
174
175         if (m_loop) {
176                 for(auto id : m_pending_event_id) {
177                         _D("Remove pending event id[%llu]", id);
178                         m_loop->remove_event(id, true);
179                 }
180                 _D("Remove event[%llu]", m_event_id);
181                 m_loop->remove_event(m_event_id, true);
182                 m_loop = NULL;
183                 m_event_id = 0;
184         }
185
186         if (m_socket) {
187                 _D("Release socket[%d]", m_socket->get_fd());
188                 delete m_socket;
189                 m_socket = NULL;
190         }
191
192         _D("Disconnected");
193 }
194
195 bool channel::send(std::shared_ptr<message> msg)
196 {
197         int retry_cnt = 0;
198         int cur_buffer_size = 0;
199
200         retv_if(!m_loop, false);
201
202         while (retry_cnt < 3) {
203                 cur_buffer_size = m_socket->get_current_buffer_size();
204                 if (cur_buffer_size <= SYSTEMD_SOCK_BUF_SIZE)
205                         break;
206                 usleep(3000);
207                 retry_cnt++;
208         }
209         retvm_if(retry_cnt >= 3, false, "Socket buffer[%d] is exceeded", cur_buffer_size);
210
211         send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg);
212         retvm_if(!handler, false, "Failed to allocate memory");
213
214         uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_OUT | EVENT_HUP | EVENT_NVAL), handler);
215         if (event_id == 0) {
216                 _D("Failed to add send event handler");
217                 delete handler;
218                 return false;
219         }
220
221         m_pending_event_id.push_back(event_id);
222         return true;
223 }
224
225 bool channel::send_sync(message &msg)
226 {
227         retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size());
228
229         ssize_t size = 0;
230         char *buf = msg.body();
231
232         /* header */
233         size = m_socket->send(reinterpret_cast<void *>(msg.header()),
234             sizeof(message_header), true);
235         retvm_if(size <= 0, false, "Failed to send header");
236
237         /* if body size is zero, skip to send body message */
238         retv_if(msg.size() == 0, true);
239
240         /* body */
241         size = m_socket->send(buf, msg.size(), true);
242         retvm_if(size <= 0, false, "Failed to send body");
243
244         return true;
245 }
246
247 bool channel::read(void)
248 {
249         retv_if(!m_loop, false);
250
251         read_event_handler *handler = new(std::nothrow) read_event_handler(this);
252         retvm_if(!handler, false, "Failed to allocate memory");
253
254         uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
255         if (event_id == 0) {
256                 _D("Failed to add read event handler");
257                 delete handler;
258                 return false;
259         }
260
261         m_pending_event_id.push_back(event_id);
262         return true;
263 }
264
265 bool channel::read_sync(message &msg, bool select)
266 {
267         message_header header;
268         ssize_t size = 0;
269         char buf[MAX_MSG_CAPACITY];
270
271         /* header */
272         size = m_socket->recv(&header, sizeof(message_header), select);
273         retv_if(size <= 0, false);
274
275         /* check error from header */
276         if (m_handler && header.err != 0) {
277                 m_handler->error_caught(this, header.err);
278                 msg.header()->err = header.err;
279                 return true;
280         }
281
282         /* body */
283         if (header.length >= MAX_MSG_CAPACITY) {
284                 _E("header.length error %u", header.length);
285                 return false;
286         }
287
288         if (header.length > 0) {
289                 size = m_socket->recv(&buf, header.length, select);
290                 retv_if(size <= 0, false);
291         }
292
293         buf[header.length] = '\0';
294         msg.enclose(reinterpret_cast<const void *>(buf), header.length);
295         msg.set_type(header.type);
296         msg.header()->err = header.err;
297
298         if (m_handler)
299                 m_handler->read(this, msg);
300
301         return true;
302 }
303
304 bool channel::is_connected(void)
305 {
306         return m_connected.load();
307 }
308
309 bool channel::set_option(int type, int value)
310 {
311         switch (type) {
312         case SO_SNDBUF:
313                 m_socket->set_buffer_size(type, value);
314                 break;
315         case SO_RCVBUF:
316                 m_socket->set_buffer_size(type, value);
317                 break;
318         default:
319                 break;
320         }
321
322         return true;
323 }
324
325 bool channel::get_option(int type, int &value) const
326 {
327         switch (type) {
328         case 0:
329                 value = m_socket->get_current_buffer_size();
330                 break;
331         case SO_SNDBUF:
332                 value = m_socket->get_buffer_size(type);
333                 break;
334         case SO_RCVBUF:
335                 value = m_socket->get_buffer_size(type);
336                 break;
337         default:
338                 break;
339         }
340
341         return true;
342 }
343
344 int channel::get_fd(void) const
345 {
346         return m_fd;
347 }
348
349 void channel::remove_pending_event_id(uint64_t id)
350 {
351         auto it = std::find(m_pending_event_id.begin(), m_pending_event_id.end(), id);
352         if (it != m_pending_event_id.end()) {
353                 m_pending_event_id.erase(it);
354         }
355 }