*/
#include <csensor_event_dispatcher.h>
-#include <sensor_plugin_loader.h>
#include <sensor_logs.h>
#include <sf_common.h>
#include <thread>
-#include <vector>
using std::thread;
using std::vector;
+using std::pair;
#define MAX_PENDING_CONNECTION 32
void csensor_event_dispatcher::dispatch_event(void)
{
- const int MAX_EVENT_PER_SENSOR = 16;
- const int MAX_SENSOR_EVENT = 1 + (sensor_plugin_loader::get_instance().get_virtual_sensors().size()
- * MAX_EVENT_PER_SENSOR);
const int MAX_SYNTH_PER_SENSOR = 5;
vector<sensor_event_t> v_sensor_events(MAX_SYNTH_PER_SENSOR);
while (true) {
bool is_hub_event = false;
-
- void *seed_event = get_event_queue().pop();
+ int seed_event_len = 0;
+ void *seed_event = get_event_queue().pop(&seed_event_len);
unsigned int event_type = *((unsigned int *)(seed_event));
if (is_sensorhub_event(event_type))
if (is_hub_event) {
sensorhub_event_t *sensorhub_event = (sensorhub_event_t *)seed_event;
- send_sensor_events(sensorhub_event, 1, true);
+ send_sensorhub_events(sensorhub_event);
} else {
- sensor_event_t sensor_events[MAX_SENSOR_EVENT];
- unsigned int event_cnt = 0;
- sensor_events[event_cnt++] = *((sensor_event_t *)seed_event);
+ vector< pair<void*, int> > sensor_events;
+ sensor_events.push_back(pair<void*, int>(seed_event, seed_event_len));
virtual_sensors v_sensors = get_active_virtual_sensors();
(*it_v_sensor)->synthesize(*((sensor_event_t *)seed_event), v_sensor_events);
synthesized_cnt = v_sensor_events.size();
- for (int i = 0; i < synthesized_cnt; ++i)
- sensor_events[event_cnt++] = v_sensor_events[i];
+ for (int i = 0; i < synthesized_cnt; ++i) {
+ sensor_event_t *v_event = (sensor_event_t*)malloc(sizeof(sensor_event_t));
+ if (!v_event) {
+ ERR("Failed to allocate memory");
+ continue;
+ }
+
+ memcpy(v_event, &v_sensor_events[i], sizeof(sensor_event_t));
+ sensor_events.push_back(pair<void*, int>(v_event, sizeof(sensor_event_t)));
+ }
++it_v_sensor;
}
- sort_sensor_events(sensor_events, event_cnt);
+ sort_sensor_events(sensor_events);
- for (unsigned int i = 0; i < event_cnt; ++i) {
- if (is_record_event(sensor_events[i].event_type))
- put_last_event(sensor_events[i].event_type, sensor_events[i]);
+ for (unsigned int i = 0; i < sensor_events.size(); ++i) {
+ if (is_record_event(((sensor_event_t*)(sensor_events[i].first))->event_type))
+ put_last_event(((sensor_event_t*)(sensor_events[i].first))->event_type, *((sensor_event_t*)(sensor_events[i].first)));
}
- send_sensor_events(sensor_events, event_cnt, false);
+ send_sensor_events(sensor_events);
}
-
- if (is_hub_event)
- delete (sensorhub_event_t *)seed_event;
- else
- delete (sensor_event_t *)seed_event;
}
}
-void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, bool is_hub_event)
+void csensor_event_dispatcher::send_sensor_events(vector< pair<void*, int> > &events)
{
sensor_event_t *sensor_events = NULL;
- sensorhub_event_t *sensor_hub_events = NULL;
cclient_info_manager& client_info_manager = get_client_info_manager();
const int RESERVED_CLIENT_CNT = 20;
static client_id_vec id_vec(RESERVED_CLIENT_CNT);
- if (is_hub_event)
- sensor_hub_events = (sensorhub_event_t *)events;
- else
- sensor_events = (sensor_event_t *)events;
-
- for (int i = 0; i < event_cnt; ++i) {
+ for (unsigned int i = 0; i < events.size(); ++i) {
sensor_id_t sensor_id;
unsigned int event_type;
-
- if (is_hub_event) {
- sensor_id = sensor_hub_events[i].sensor_id;
- event_type = sensor_hub_events[i].event_type;
- } else {
- sensor_id = sensor_events[i].sensor_id;
- event_type = sensor_events[i].event_type;
- }
+ sensor_events = (sensor_event_t*)events[i].first;
+ int length = events[i].second;
+ sensor_id = sensor_events->sensor_id;
+ event_type = sensor_events->event_type;
id_vec.clear();
client_info_manager.get_listener_ids(sensor_id, event_type, id_vec);
while (it_client_id != id_vec.end()) {
csocket client_socket;
- bool ret;
-
- if (!client_info_manager.get_event_socket(*it_client_id, client_socket)) {
- ++it_client_id;
- continue;
- }
-
- if (is_hub_event)
- ret = (client_socket.send(sensor_hub_events + i, sizeof(sensorhub_event_t)) > 0);
- else
- ret = (client_socket.send(sensor_events + i, sizeof(sensor_event_t)) > 0);
+ client_info_manager.get_event_socket(*it_client_id, client_socket);
+ bool ret = (client_socket.send(sensor_events, length) > 0);
if (ret)
DBG("Event[0x%x] sent to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
++it_client_id;
}
+
+ free(sensor_events);
}
}
+void csensor_event_dispatcher::send_sensorhub_events(void* events)
+{
+ sensorhub_event_t *sensor_hub_events;
+ cclient_info_manager& client_info_manager = get_client_info_manager();
+
+ const int RESERVED_CLIENT_CNT = 20;
+ static client_id_vec id_vec(RESERVED_CLIENT_CNT);
+
+ sensor_hub_events = (sensorhub_event_t *)events;
+
+ sensor_id_t sensor_id;
+ unsigned int event_type;
+
+ sensor_id = sensor_hub_events->sensor_id;
+ event_type = sensor_hub_events->event_type;
+
+ id_vec.clear();
+ client_info_manager.get_listener_ids(sensor_id, event_type, id_vec);
+
+ auto it_client_id = id_vec.begin();
+
+ while (it_client_id != id_vec.end()) {
+ csocket client_socket;
+ client_info_manager.get_event_socket(*it_client_id, client_socket);
+ bool ret = (client_socket.send(sensor_hub_events, sizeof(sensorhub_event_t)) > 0);
+
+ if (ret)
+ DBG("Event[0x%x] sent to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
+ else
+ ERR("Failed to send event[0x%x] to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
+
+ ++it_client_id;
+ }
+
+ free(sensor_hub_events);
+}
+
cclient_info_manager& csensor_event_dispatcher::get_client_info_manager(void)
{
return cclient_info_manager::get_instance();
return m_active_virtual_sensors;
}
-void csensor_event_dispatcher::sort_sensor_events(sensor_event_t *events, unsigned int cnt)
+struct sort_comp {
+ bool operator()(const pair<void*, int> &left, const pair<void*, int> &right) {
+ return ((sensor_event_t*)(left.first))->data.timestamp < ((sensor_event_t*)(right.first))->data.timestamp;
+ }
+};
+
+void csensor_event_dispatcher::sort_sensor_events(vector< pair<void*, int> > &events)
{
- std::sort(events, events + cnt,
- [](const sensor_event_t& a, const sensor_event_t &b)->bool {
- return a.data.timestamp < b.data.timestamp;
- }
- );
+ std::sort(events.begin(), events.end(), sort_comp());
}
-
void csensor_event_dispatcher::request_last_event(int client_id, sensor_id_t sensor_id)
{
cclient_info_manager& client_info_manager = get_client_info_manager();
return inst;
}
-void csensor_event_queue::push(const sensor_event_t &event)
-{
- sensor_event_t *new_event = new(std::nothrow) sensor_event_t;
- retm_if(!new_event, "Failed to allocate memory");
- *new_event = event;
-
- push_internal(new_event);
-}
-
-void csensor_event_queue::push(sensor_event_t *event)
-{
- push_internal(event);
-}
-
-void csensor_event_queue::push(const sensorhub_event_t &event)
-{
- sensorhub_event_t *new_event = new(std::nothrow) sensorhub_event_t;
- retm_if(!new_event, "Failed to allocate memory");
- *new_event = event;
-
- push_internal(new_event);
-}
-
-void csensor_event_queue::push(sensorhub_event_t *event)
-{
- push_internal(event);
-}
-
-void csensor_event_queue::push_internal(void *event)
+void csensor_event_queue::push_internal(void *event, int length)
{
lock l(m_mutex);
bool wake = m_queue.empty();
if (m_queue.size() >= QUEUE_FULL_SIZE) {
ERR("Queue is full, drop it!");
-
- unsigned int event_type = *((unsigned int *)(event));
-
- if (is_sensorhub_event(event_type))
- delete (sensorhub_event_t *)event;
- else
- delete (sensor_event_t *)event;
+ free(event);
} else
- m_queue.push(event);
+ m_queue.push(std::pair<void*, int>(event, length));
if (wake)
m_cond_var.notify_one();
}
-void* csensor_event_queue::pop(void)
+void* csensor_event_queue::pop(int *length)
{
ulock u(m_mutex);
while (m_queue.empty())
m_cond_var.wait(u);
- void* event = m_queue.top();
+ std::pair<void*, int> event = m_queue.top();
m_queue.pop();
- return event;
+
+ *length = event.second;
+ return event.first;
}
#if !defined(_CSENSOR_EVENT_QUEUE_CLASS_H_)
#define _CSENSOR_EVENT_QUEUE_CLASS_H_
#include <sf_common.h>
+#include <cstring>
+#include <utility>
#include <queue>
#include <mutex>
#include <condition_variable>
class compare {
public:
- bool operator() (void* &v1,void *&v2) {
- sensor_event_t *e1 = (sensor_event_t *)v1;
- sensor_event_t *e2 = (sensor_event_t *)v2;
+ bool operator() (std::pair<void *, int> v1, std::pair<void *, int> v2) {
+ sensor_event_t *e1 = (sensor_event_t *)v1.first;
+ sensor_event_t *e2 = (sensor_event_t *)v2.first;
bool prioritize_e1 = true;
bool prioritize_e2 = true;
}
};
- std::priority_queue<void*, std::vector<void*>, compare> m_queue;
+ std::priority_queue<std::pair<void*, int>, std::vector<std::pair<void*, int>>, compare> m_queue;
std::mutex m_mutex;
std::condition_variable m_cond_var;
~csensor_event_queue() {};
csensor_event_queue(const csensor_event_queue &) {};
csensor_event_queue& operator=(const csensor_event_queue &);
- void push_internal(void *event);
+ void push_internal(void *event, int length);
public:
static csensor_event_queue& get_instance();
- void push(const sensor_event_t &event);
- void push(sensor_event_t *event);
- void push(const sensorhub_event_t &event);
- void push(sensorhub_event_t *event);
-
- void* pop(void);
+ template<typename T> void push(const T &event);
+ template<typename T> void push(T *event);
+ void* pop(int *length);
};
+template<typename T>
+void csensor_event_queue::push(const T &event)
+{
+ void *new_event = malloc(sizeof(event));
+ if (!new_event)
+ return;
+ memcpy(new_event, &event, sizeof(event));
+ push_internal(new_event, sizeof(event));
+}
+
+template<typename T>
+void csensor_event_queue::push(T *event)
+{
+ push_internal(event, sizeof(event));
+}
#endif