sensord: add checking code whether socket is valid or not
[platform/core/system/sensord.git] / src / server / csensor_event_dispatcher.cpp
1 /*
2  * libsensord-share
3  *
4  * Copyright (c) 2014 Samsung Electronics Co., Ltd.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  */
19
20 #include <csensor_event_dispatcher.h>
21 #include <sensor_plugin_loader.h>
22 #include <sensor_logs.h>
23 #include <sf_common.h>
24 #include <thread>
25 #include <vector>
26
27 using std::thread;
28 using std::vector;
29
30 #define MAX_PENDING_CONNECTION 32
31
32 csensor_event_dispatcher::csensor_event_dispatcher()
33 {
34 }
35
36 csensor_event_dispatcher::~csensor_event_dispatcher() { }
37
38
39 csensor_event_dispatcher& csensor_event_dispatcher::get_instance()
40 {
41         static csensor_event_dispatcher inst;
42         return inst;
43 }
44
45
46 bool csensor_event_dispatcher::run(void)
47 {
48         INFO("Starting Event Dispatcher\n");
49
50         if (!m_accept_socket.create(SOCK_SEQPACKET)) {
51                 ERR("Listener Socket Creation failed in Server \n");
52                 return false;
53         }
54
55         if(!m_accept_socket.bind(EVENT_CHANNEL_PATH)) {
56                 ERR("Listener Socket Binding failed in Server \n");
57                 m_accept_socket.close();
58                 return false;
59         }
60
61         if(!m_accept_socket.listen(MAX_PENDING_CONNECTION)) {
62                 ERR("Socket Listen failed in Server \n");
63                 return false;
64         }
65
66         thread accepter(&csensor_event_dispatcher::accept_connections, this);
67         accepter.detach();
68
69         thread dispatcher(&csensor_event_dispatcher::dispatch_event, this);
70         dispatcher.detach();
71
72         return true;
73 }
74
75 void csensor_event_dispatcher::accept_event_channel(csocket client_socket)
76 {
77         int client_id;
78         event_channel_ready_t event_channel_ready;
79         cclient_info_manager& client_info_manager = get_client_info_manager();
80
81         client_socket.set_connection_mode();
82
83         if (client_socket.recv(&client_id, sizeof(client_id)) <= 0) {
84                 ERR("Failed to receive client id on socket fd[%d]", client_socket.get_socket_fd());
85                 return;
86         }
87
88         client_socket.set_transfer_mode();
89
90         AUTOLOCK(m_mutex);
91
92         if(!get_client_info_manager().set_event_socket(client_id, client_socket)) {
93                 ERR("Failed to store event socket[%d] for %s", client_socket.get_socket_fd(),
94                         client_info_manager.get_client_info(client_id));
95                 return;
96         }
97
98         event_channel_ready.magic = EVENT_CHANNEL_MAGIC;
99         event_channel_ready.client_id = client_id;
100
101         INFO("Event channel is accepted for %s on socket[%d]",
102                 client_info_manager.get_client_info(client_id), client_socket.get_socket_fd());
103
104         if (client_socket.send(&event_channel_ready, sizeof(event_channel_ready)) <= 0) {
105                 ERR("Failed to send event_channel_ready packet to %s on socket fd[%d]",
106                         client_info_manager.get_client_info(client_id), client_socket.get_socket_fd());
107                 return;
108         }
109 }
110
111 void csensor_event_dispatcher::accept_connections(void)
112 {
113         INFO("Event channel acceptor is started.\n");
114
115         while (true) {
116                 csocket client_socket;
117
118                 if (!m_accept_socket.accept(client_socket)) {
119                         ERR("Accepting socket failed in Server \n");
120                         continue;
121                 }
122
123                 INFO("New client connected (socket_fd : %d)\n", client_socket.get_socket_fd());
124
125                 thread event_channel_creator(&csensor_event_dispatcher::accept_event_channel, this, client_socket);
126                 event_channel_creator.detach();
127         }
128 }
129
130 void csensor_event_dispatcher::dispatch_event(void)
131 {
132         const int MAX_EVENT_PER_SENSOR = 16;
133         const int MAX_SENSOR_EVENT = 1 + (sensor_plugin_loader::get_instance().get_virtual_sensors().size()
134                 * MAX_EVENT_PER_SENSOR);
135         const int MAX_SYNTH_PER_SENSOR = 5;
136
137         vector<sensor_event_t> v_sensor_events(MAX_SYNTH_PER_SENSOR);
138
139         INFO("Event Dispatcher started");
140
141         while (true) {
142                 bool is_hub_event = false;
143
144                 void *seed_event = get_event_queue().pop();
145                 unsigned int event_type = *((unsigned int *)(seed_event));
146
147                 if (is_sensorhub_event(event_type))
148                         is_hub_event = true;
149
150                 if (is_hub_event) {
151                         sensorhub_event_t *sensorhub_event = (sensorhub_event_t *)seed_event;
152                         send_sensor_events(sensorhub_event, 1, true);
153                 } else {
154                         sensor_event_t sensor_events[MAX_SENSOR_EVENT];
155                         unsigned int event_cnt = 0;
156                         sensor_events[event_cnt++] = *((sensor_event_t *)seed_event);
157
158                         virtual_sensors v_sensors = get_active_virtual_sensors();
159
160                         auto it_v_sensor = v_sensors.begin();
161
162                         while (it_v_sensor != v_sensors.end()) {
163                                 int synthesized_cnt;
164                                 v_sensor_events.clear();
165                                 (*it_v_sensor)->synthesize(*((sensor_event_t *)seed_event), v_sensor_events);
166                                 synthesized_cnt = v_sensor_events.size();
167
168                                 for (int i = 0; i < synthesized_cnt; ++i)
169                                         sensor_events[event_cnt++] = v_sensor_events[i];
170
171                                 ++it_v_sensor;
172                         }
173
174                         sort_sensor_events(sensor_events, event_cnt);
175
176                         for (unsigned int i = 0; i < event_cnt; ++i) {
177                                 if (is_record_event(sensor_events[i].event_type))
178                                         put_last_event(sensor_events[i].event_type, sensor_events[i]);
179                         }
180
181                         send_sensor_events(sensor_events, event_cnt, false);
182                 }
183
184                 if (is_hub_event)
185                         delete (sensorhub_event_t *)seed_event;
186                 else
187                         delete (sensor_event_t *)seed_event;
188         }
189 }
190
191
192 void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, bool is_hub_event)
193 {
194         sensor_event_t *sensor_events = NULL;
195         sensorhub_event_t *sensor_hub_events = NULL;
196         cclient_info_manager& client_info_manager = get_client_info_manager();
197
198         const int RESERVED_CLIENT_CNT = 20;
199         static client_id_vec id_vec(RESERVED_CLIENT_CNT);
200
201         if (is_hub_event)
202                 sensor_hub_events = (sensorhub_event_t *)events;
203         else
204                 sensor_events = (sensor_event_t *)events;
205
206         for (int i = 0; i < event_cnt; ++i) {
207                 sensor_id_t sensor_id;
208                 unsigned int event_type;
209
210                 if (is_hub_event) {
211                         sensor_id = sensor_hub_events[i].sensor_id;
212                         event_type = sensor_hub_events[i].event_type;
213                 } else {
214                         sensor_id = sensor_events[i].sensor_id;
215                         event_type = sensor_events[i].event_type;
216                 }
217
218                 id_vec.clear();
219                 client_info_manager.get_listener_ids(sensor_id, event_type, id_vec);
220
221                 auto it_client_id = id_vec.begin();
222
223                 while (it_client_id != id_vec.end()) {
224                         csocket client_socket;
225                         bool ret;
226
227                         if (!client_info_manager.get_event_socket(*it_client_id, client_socket)) {
228                                 ++it_client_id;
229                                 continue;
230                         }
231
232                         if (is_hub_event)
233                                 ret = (client_socket.send(sensor_hub_events + i, sizeof(sensorhub_event_t)) > 0);
234                         else
235                                 ret = (client_socket.send(sensor_events + i, sizeof(sensor_event_t)) > 0);
236
237                         if (ret)
238                                 DBG("Event[0x%x] sent to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
239                         else
240                                 ERR("Failed to send event[0x%x] to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
241
242                         ++it_client_id;
243                 }
244         }
245 }
246
247 cclient_info_manager& csensor_event_dispatcher::get_client_info_manager(void)
248 {
249         return cclient_info_manager::get_instance();
250 }
251
252 csensor_event_queue& csensor_event_dispatcher::get_event_queue(void)
253 {
254         return csensor_event_queue::get_instance();
255 }
256
257 bool csensor_event_dispatcher::is_record_event(unsigned int event_type)
258 {
259         return false;
260 }
261
262 void csensor_event_dispatcher::put_last_event(unsigned int event_type, const sensor_event_t &event)
263 {
264         AUTOLOCK(m_last_events_mutex);
265         m_last_events[event_type] = event;
266 }
267
268 bool csensor_event_dispatcher::get_last_event(unsigned int event_type, sensor_event_t &event)
269 {
270         AUTOLOCK(m_last_events_mutex);
271
272         auto it_event = m_last_events.find(event_type);
273
274         if (it_event == m_last_events.end())
275                 return false;
276
277         event = it_event->second;
278         return true;
279 }
280
281 bool csensor_event_dispatcher::has_active_virtual_sensor(virtual_sensor *sensor)
282 {
283         AUTOLOCK(m_active_virtual_sensors_mutex);
284
285         auto it_v_sensor = find(m_active_virtual_sensors.begin(), m_active_virtual_sensors.end(), sensor);
286
287         return (it_v_sensor != m_active_virtual_sensors.end());
288 }
289
290
291 virtual_sensors csensor_event_dispatcher::get_active_virtual_sensors(void)
292 {
293         AUTOLOCK(m_active_virtual_sensors_mutex);
294
295         return m_active_virtual_sensors;
296 }
297
298 void csensor_event_dispatcher::sort_sensor_events(sensor_event_t *events, unsigned int cnt)
299 {
300         std::sort(events, events + cnt,
301                 [](const sensor_event_t& a, const sensor_event_t &b)->bool {
302                         return a.data.timestamp < b.data.timestamp;
303                 }
304         );
305 }
306
307
308 void csensor_event_dispatcher::request_last_event(int client_id, sensor_id_t sensor_id)
309 {
310         cclient_info_manager& client_info_manager = get_client_info_manager();
311         event_type_vector event_vec;
312         csocket client_socket;
313
314         if (client_info_manager.get_registered_events(client_id, sensor_id, event_vec)) {
315                 if (!client_info_manager.get_event_socket(client_id, client_socket)) {
316                         ERR("Failed to get event socket from %s",
317                                         client_info_manager.get_client_info(client_id));
318                         return;
319                 }
320
321                 auto it_event = event_vec.begin();
322                 while (it_event != event_vec.end()) {
323                         sensor_event_t event;
324                         if (is_record_event(*it_event) && get_last_event(*it_event, event)) {
325                                 if (client_socket.send(&event, sizeof(event)) > 0)
326                                         INFO("Send the last event[0x%x] to %s on socket[%d]", event.event_type,
327                                                 client_info_manager.get_client_info(client_id), client_socket.get_socket_fd());
328                                 else
329                                         ERR("Failed to send event[0x%x] to %s on socket[%d]", event.event_type,
330                                                 client_info_manager.get_client_info(client_id), client_socket.get_socket_fd());
331                         }
332                         ++it_event;
333                 }
334         }
335 }
336
337
338 bool csensor_event_dispatcher::add_active_virtual_sensor(virtual_sensor * sensor)
339 {
340         AUTOLOCK(m_active_virtual_sensors_mutex);
341
342         if (has_active_virtual_sensor(sensor)) {
343                 ERR("[%s] sensor is already added on active virtual sensors", sensor->get_name());
344                 return false;
345         }
346
347         m_active_virtual_sensors.push_back(sensor);
348
349         return true;
350 }
351
352 bool csensor_event_dispatcher::delete_active_virtual_sensor(virtual_sensor * sensor)
353 {
354         AUTOLOCK(m_active_virtual_sensors_mutex);
355
356         auto it_v_sensor = find(m_active_virtual_sensors.begin(), m_active_virtual_sensors.end(), sensor);
357
358         if (it_v_sensor == m_active_virtual_sensors.end()) {
359                 ERR("Fail to delete non-existent [%s] sensor on active virtual sensors", sensor->get_name());
360                 return false;
361         }
362
363         m_active_virtual_sensors.erase(it_v_sensor);
364
365         return true;
366 }