4 * Copyright (c) 2014 Samsung Electronics Co., Ltd.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
20 #include <csensor_event_dispatcher.h>
21 #include <sensor_plugin_loader.h>
22 #include <sensor_logs.h>
23 #include <sf_common.h>
30 #define MAX_PENDING_CONNECTION 32
32 csensor_event_dispatcher::csensor_event_dispatcher()
36 csensor_event_dispatcher::~csensor_event_dispatcher() { }
39 csensor_event_dispatcher& csensor_event_dispatcher::get_instance()
41 static csensor_event_dispatcher inst;
46 bool csensor_event_dispatcher::run(void)
48 INFO("Starting Event Dispatcher\n");
50 if (!m_accept_socket.create(SOCK_SEQPACKET)) {
51 ERR("Listener Socket Creation failed in Server \n");
55 if(!m_accept_socket.bind(EVENT_CHANNEL_PATH)) {
56 ERR("Listener Socket Binding failed in Server \n");
57 m_accept_socket.close();
61 if(!m_accept_socket.listen(MAX_PENDING_CONNECTION)) {
62 ERR("Socket Listen failed in Server \n");
66 thread accepter(&csensor_event_dispatcher::accept_connections, this);
69 thread dispatcher(&csensor_event_dispatcher::dispatch_event, this);
75 void csensor_event_dispatcher::accept_event_channel(csocket client_socket)
78 event_channel_ready_t event_channel_ready;
79 cclient_info_manager& client_info_manager = get_client_info_manager();
81 client_socket.set_connection_mode();
83 if (client_socket.recv(&client_id, sizeof(client_id)) <= 0) {
84 ERR("Failed to receive client id on socket fd[%d]", client_socket.get_socket_fd());
88 client_socket.set_transfer_mode();
92 if(!get_client_info_manager().set_event_socket(client_id, client_socket)) {
93 ERR("Failed to store event socket[%d] for %s", client_socket.get_socket_fd(),
94 client_info_manager.get_client_info(client_id));
98 event_channel_ready.magic = EVENT_CHANNEL_MAGIC;
99 event_channel_ready.client_id = client_id;
101 INFO("Event channel is accepted for %s on socket[%d]",
102 client_info_manager.get_client_info(client_id), client_socket.get_socket_fd());
104 if (client_socket.send(&event_channel_ready, sizeof(event_channel_ready)) <= 0) {
105 ERR("Failed to send event_channel_ready packet to %s on socket fd[%d]",
106 client_info_manager.get_client_info(client_id), client_socket.get_socket_fd());
111 void csensor_event_dispatcher::accept_connections(void)
113 INFO("Event channel acceptor is started.\n");
116 csocket client_socket;
118 if (!m_accept_socket.accept(client_socket)) {
119 ERR("Accepting socket failed in Server \n");
123 INFO("New client connected (socket_fd : %d)\n", client_socket.get_socket_fd());
125 thread event_channel_creator(&csensor_event_dispatcher::accept_event_channel, this, client_socket);
126 event_channel_creator.detach();
130 void csensor_event_dispatcher::dispatch_event(void)
132 const int MAX_EVENT_PER_SENSOR = 16;
133 const int MAX_SENSOR_EVENT = 1 + (sensor_plugin_loader::get_instance().get_virtual_sensors().size()
134 * MAX_EVENT_PER_SENSOR);
135 const int MAX_SYNTH_PER_SENSOR = 5;
137 vector<sensor_event_t> v_sensor_events(MAX_SYNTH_PER_SENSOR);
139 INFO("Event Dispatcher started");
142 bool is_hub_event = false;
144 void *seed_event = get_event_queue().pop();
145 unsigned int event_type = *((unsigned int *)(seed_event));
147 if (is_sensorhub_event(event_type))
151 sensorhub_event_t *sensorhub_event = (sensorhub_event_t *)seed_event;
152 send_sensor_events(sensorhub_event, 1, true);
154 sensor_event_t sensor_events[MAX_SENSOR_EVENT];
155 unsigned int event_cnt = 0;
156 sensor_events[event_cnt++] = *((sensor_event_t *)seed_event);
158 virtual_sensors v_sensors = get_active_virtual_sensors();
160 auto it_v_sensor = v_sensors.begin();
162 while (it_v_sensor != v_sensors.end()) {
164 v_sensor_events.clear();
165 (*it_v_sensor)->synthesize(*((sensor_event_t *)seed_event), v_sensor_events);
166 synthesized_cnt = v_sensor_events.size();
168 for (int i = 0; i < synthesized_cnt; ++i)
169 sensor_events[event_cnt++] = v_sensor_events[i];
174 sort_sensor_events(sensor_events, event_cnt);
176 for (unsigned int i = 0; i < event_cnt; ++i) {
177 if (is_record_event(sensor_events[i].event_type))
178 put_last_event(sensor_events[i].event_type, sensor_events[i]);
181 send_sensor_events(sensor_events, event_cnt, false);
185 delete (sensorhub_event_t *)seed_event;
187 delete (sensor_event_t *)seed_event;
192 void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, bool is_hub_event)
194 sensor_event_t *sensor_events = NULL;
195 sensorhub_event_t *sensor_hub_events = NULL;
196 cclient_info_manager& client_info_manager = get_client_info_manager();
198 const int RESERVED_CLIENT_CNT = 20;
199 static client_id_vec id_vec(RESERVED_CLIENT_CNT);
202 sensor_hub_events = (sensorhub_event_t *)events;
204 sensor_events = (sensor_event_t *)events;
206 for (int i = 0; i < event_cnt; ++i) {
207 sensor_id_t sensor_id;
208 unsigned int event_type;
211 sensor_id = sensor_hub_events[i].sensor_id;
212 event_type = sensor_hub_events[i].event_type;
214 sensor_id = sensor_events[i].sensor_id;
215 event_type = sensor_events[i].event_type;
219 client_info_manager.get_listener_ids(sensor_id, event_type, id_vec);
221 auto it_client_id = id_vec.begin();
223 while (it_client_id != id_vec.end()) {
224 csocket client_socket;
227 if (!client_info_manager.get_event_socket(*it_client_id, client_socket)) {
233 ret = (client_socket.send(sensor_hub_events + i, sizeof(sensorhub_event_t)) > 0);
235 ret = (client_socket.send(sensor_events + i, sizeof(sensor_event_t)) > 0);
238 DBG("Event[0x%x] sent to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
240 ERR("Failed to send event[0x%x] to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
247 cclient_info_manager& csensor_event_dispatcher::get_client_info_manager(void)
249 return cclient_info_manager::get_instance();
252 csensor_event_queue& csensor_event_dispatcher::get_event_queue(void)
254 return csensor_event_queue::get_instance();
257 bool csensor_event_dispatcher::is_record_event(unsigned int event_type)
262 void csensor_event_dispatcher::put_last_event(unsigned int event_type, const sensor_event_t &event)
264 AUTOLOCK(m_last_events_mutex);
265 m_last_events[event_type] = event;
268 bool csensor_event_dispatcher::get_last_event(unsigned int event_type, sensor_event_t &event)
270 AUTOLOCK(m_last_events_mutex);
272 auto it_event = m_last_events.find(event_type);
274 if (it_event == m_last_events.end())
277 event = it_event->second;
281 bool csensor_event_dispatcher::has_active_virtual_sensor(virtual_sensor *sensor)
283 AUTOLOCK(m_active_virtual_sensors_mutex);
285 auto it_v_sensor = find(m_active_virtual_sensors.begin(), m_active_virtual_sensors.end(), sensor);
287 return (it_v_sensor != m_active_virtual_sensors.end());
291 virtual_sensors csensor_event_dispatcher::get_active_virtual_sensors(void)
293 AUTOLOCK(m_active_virtual_sensors_mutex);
295 return m_active_virtual_sensors;
298 void csensor_event_dispatcher::sort_sensor_events(sensor_event_t *events, unsigned int cnt)
300 std::sort(events, events + cnt,
301 [](const sensor_event_t& a, const sensor_event_t &b)->bool {
302 return a.data.timestamp < b.data.timestamp;
308 void csensor_event_dispatcher::request_last_event(int client_id, sensor_id_t sensor_id)
310 cclient_info_manager& client_info_manager = get_client_info_manager();
311 event_type_vector event_vec;
312 csocket client_socket;
314 if (client_info_manager.get_registered_events(client_id, sensor_id, event_vec)) {
315 if (!client_info_manager.get_event_socket(client_id, client_socket)) {
316 ERR("Failed to get event socket from %s",
317 client_info_manager.get_client_info(client_id));
321 auto it_event = event_vec.begin();
322 while (it_event != event_vec.end()) {
323 sensor_event_t event;
324 if (is_record_event(*it_event) && get_last_event(*it_event, event)) {
325 if (client_socket.send(&event, sizeof(event)) > 0)
326 INFO("Send the last event[0x%x] to %s on socket[%d]", event.event_type,
327 client_info_manager.get_client_info(client_id), client_socket.get_socket_fd());
329 ERR("Failed to send event[0x%x] to %s on socket[%d]", event.event_type,
330 client_info_manager.get_client_info(client_id), client_socket.get_socket_fd());
338 bool csensor_event_dispatcher::add_active_virtual_sensor(virtual_sensor * sensor)
340 AUTOLOCK(m_active_virtual_sensors_mutex);
342 if (has_active_virtual_sensor(sensor)) {
343 ERR("[%s] sensor is already added on active virtual sensors", sensor->get_name());
347 m_active_virtual_sensors.push_back(sensor);
352 bool csensor_event_dispatcher::delete_active_virtual_sensor(virtual_sensor * sensor)
354 AUTOLOCK(m_active_virtual_sensors_mutex);
356 auto it_v_sensor = find(m_active_virtual_sensors.begin(), m_active_virtual_sensors.end(), sensor);
358 if (it_v_sensor == m_active_virtual_sensors.end()) {
359 ERR("Fail to delete non-existent [%s] sensor on active virtual sensors", sensor->get_name());
363 m_active_virtual_sensors.erase(it_v_sensor);