From 1f37a109887306071e679353255ff9eb764436f4 Mon Sep 17 00:00:00 2001 From: "kibak.yoon" Date: Thu, 21 Jan 2016 13:35:28 +0900 Subject: [PATCH] sensord: modify event queue & dispatcher to support new sensor event structures Change-Id: I4745524f042f7ba1ab3130928e63c33ef8d47a87 Signed-off-by: kibak.yoon --- src/server/csensor_event_dispatcher.cpp | 133 ++++++++++++++++++-------------- src/server/csensor_event_dispatcher.h | 7 +- src/server/csensor_event_queue.cpp | 48 ++---------- src/server/csensor_event_queue.h | 36 ++++++--- 4 files changed, 114 insertions(+), 110 deletions(-) diff --git a/src/server/csensor_event_dispatcher.cpp b/src/server/csensor_event_dispatcher.cpp index 7408520..06eb8a8 100755 --- a/src/server/csensor_event_dispatcher.cpp +++ b/src/server/csensor_event_dispatcher.cpp @@ -18,14 +18,13 @@ */ #include -#include #include #include #include -#include using std::thread; using std::vector; +using std::pair; #define MAX_PENDING_CONNECTION 32 @@ -130,9 +129,6 @@ void csensor_event_dispatcher::accept_connections(void) void csensor_event_dispatcher::dispatch_event(void) { - const int MAX_EVENT_PER_SENSOR = 16; - const int MAX_SENSOR_EVENT = 1 + (sensor_plugin_loader::get_instance().get_virtual_sensors().size() - * MAX_EVENT_PER_SENSOR); const int MAX_SYNTH_PER_SENSOR = 5; vector v_sensor_events(MAX_SYNTH_PER_SENSOR); @@ -141,8 +137,8 @@ void csensor_event_dispatcher::dispatch_event(void) while (true) { bool is_hub_event = false; - - void *seed_event = get_event_queue().pop(); + int seed_event_len = 0; + void *seed_event = get_event_queue().pop(&seed_event_len); unsigned int event_type = *((unsigned int *)(seed_event)); if (is_sensorhub_event(event_type)) @@ -150,11 +146,10 @@ void csensor_event_dispatcher::dispatch_event(void) if (is_hub_event) { sensorhub_event_t *sensorhub_event = (sensorhub_event_t *)seed_event; - send_sensor_events(sensorhub_event, 1, true); + send_sensorhub_events(sensorhub_event); } else { - sensor_event_t sensor_events[MAX_SENSOR_EVENT]; - unsigned int event_cnt = 0; - sensor_events[event_cnt++] = *((sensor_event_t *)seed_event); + vector< pair > sensor_events; + sensor_events.push_back(pair(seed_event, seed_event_len)); virtual_sensors v_sensors = get_active_virtual_sensors(); @@ -166,55 +161,48 @@ void csensor_event_dispatcher::dispatch_event(void) (*it_v_sensor)->synthesize(*((sensor_event_t *)seed_event), v_sensor_events); synthesized_cnt = v_sensor_events.size(); - for (int i = 0; i < synthesized_cnt; ++i) - sensor_events[event_cnt++] = v_sensor_events[i]; + for (int i = 0; i < synthesized_cnt; ++i) { + sensor_event_t *v_event = (sensor_event_t*)malloc(sizeof(sensor_event_t)); + if (!v_event) { + ERR("Failed to allocate memory"); + continue; + } + + memcpy(v_event, &v_sensor_events[i], sizeof(sensor_event_t)); + sensor_events.push_back(pair(v_event, sizeof(sensor_event_t))); + } ++it_v_sensor; } - sort_sensor_events(sensor_events, event_cnt); + sort_sensor_events(sensor_events); - for (unsigned int i = 0; i < event_cnt; ++i) { - if (is_record_event(sensor_events[i].event_type)) - put_last_event(sensor_events[i].event_type, sensor_events[i]); + for (unsigned int i = 0; i < sensor_events.size(); ++i) { + if (is_record_event(((sensor_event_t*)(sensor_events[i].first))->event_type)) + put_last_event(((sensor_event_t*)(sensor_events[i].first))->event_type, *((sensor_event_t*)(sensor_events[i].first))); } - send_sensor_events(sensor_events, event_cnt, false); + send_sensor_events(sensor_events); } - - if (is_hub_event) - delete (sensorhub_event_t *)seed_event; - else - delete (sensor_event_t *)seed_event; } } -void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, bool is_hub_event) +void csensor_event_dispatcher::send_sensor_events(vector< pair > &events) { sensor_event_t *sensor_events = NULL; - sensorhub_event_t *sensor_hub_events = NULL; cclient_info_manager& client_info_manager = get_client_info_manager(); const int RESERVED_CLIENT_CNT = 20; static client_id_vec id_vec(RESERVED_CLIENT_CNT); - if (is_hub_event) - sensor_hub_events = (sensorhub_event_t *)events; - else - sensor_events = (sensor_event_t *)events; - - for (int i = 0; i < event_cnt; ++i) { + for (unsigned int i = 0; i < events.size(); ++i) { sensor_id_t sensor_id; unsigned int event_type; - - if (is_hub_event) { - sensor_id = sensor_hub_events[i].sensor_id; - event_type = sensor_hub_events[i].event_type; - } else { - sensor_id = sensor_events[i].sensor_id; - event_type = sensor_events[i].event_type; - } + sensor_events = (sensor_event_t*)events[i].first; + int length = events[i].second; + sensor_id = sensor_events->sensor_id; + event_type = sensor_events->event_type; id_vec.clear(); client_info_manager.get_listener_ids(sensor_id, event_type, id_vec); @@ -223,17 +211,8 @@ void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, b while (it_client_id != id_vec.end()) { csocket client_socket; - bool ret; - - if (!client_info_manager.get_event_socket(*it_client_id, client_socket)) { - ++it_client_id; - continue; - } - - if (is_hub_event) - ret = (client_socket.send(sensor_hub_events + i, sizeof(sensorhub_event_t)) > 0); - else - ret = (client_socket.send(sensor_events + i, sizeof(sensor_event_t)) > 0); + client_info_manager.get_event_socket(*it_client_id, client_socket); + bool ret = (client_socket.send(sensor_events, length) > 0); if (ret) DBG("Event[0x%x] sent to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd()); @@ -242,9 +221,48 @@ void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, b ++it_client_id; } + + free(sensor_events); } } +void csensor_event_dispatcher::send_sensorhub_events(void* events) +{ + sensorhub_event_t *sensor_hub_events; + cclient_info_manager& client_info_manager = get_client_info_manager(); + + const int RESERVED_CLIENT_CNT = 20; + static client_id_vec id_vec(RESERVED_CLIENT_CNT); + + sensor_hub_events = (sensorhub_event_t *)events; + + sensor_id_t sensor_id; + unsigned int event_type; + + sensor_id = sensor_hub_events->sensor_id; + event_type = sensor_hub_events->event_type; + + id_vec.clear(); + client_info_manager.get_listener_ids(sensor_id, event_type, id_vec); + + auto it_client_id = id_vec.begin(); + + while (it_client_id != id_vec.end()) { + csocket client_socket; + client_info_manager.get_event_socket(*it_client_id, client_socket); + bool ret = (client_socket.send(sensor_hub_events, sizeof(sensorhub_event_t)) > 0); + + if (ret) + DBG("Event[0x%x] sent to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd()); + else + ERR("Failed to send event[0x%x] to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd()); + + ++it_client_id; + } + + free(sensor_hub_events); +} + cclient_info_manager& csensor_event_dispatcher::get_client_info_manager(void) { return cclient_info_manager::get_instance(); @@ -296,16 +314,17 @@ virtual_sensors csensor_event_dispatcher::get_active_virtual_sensors(void) return m_active_virtual_sensors; } -void csensor_event_dispatcher::sort_sensor_events(sensor_event_t *events, unsigned int cnt) +struct sort_comp { + bool operator()(const pair &left, const pair &right) { + return ((sensor_event_t*)(left.first))->data.timestamp < ((sensor_event_t*)(right.first))->data.timestamp; + } +}; + +void csensor_event_dispatcher::sort_sensor_events(vector< pair > &events) { - std::sort(events, events + cnt, - [](const sensor_event_t& a, const sensor_event_t &b)->bool { - return a.data.timestamp < b.data.timestamp; - } - ); + std::sort(events.begin(), events.end(), sort_comp()); } - void csensor_event_dispatcher::request_last_event(int client_id, sensor_id_t sensor_id) { cclient_info_manager& client_info_manager = get_client_info_manager(); diff --git a/src/server/csensor_event_dispatcher.h b/src/server/csensor_event_dispatcher.h index 27131c2..bb40ac2 100755 --- a/src/server/csensor_event_dispatcher.h +++ b/src/server/csensor_event_dispatcher.h @@ -27,6 +27,8 @@ #include #include #include +#include + typedef std::unordered_map event_type_last_event_map; typedef std::list virtual_sensors; @@ -51,7 +53,8 @@ private: void accept_event_channel(csocket client_socket); void dispatch_event(void); - void send_sensor_events(void* events, int event_cnt, bool is_hub_event); + void send_sensor_events(std::vector< std::pair > &events); + void send_sensorhub_events(void* events); static cclient_info_manager& get_client_info_manager(void); static csensor_event_queue& get_event_queue(void); @@ -62,7 +65,7 @@ private: bool has_active_virtual_sensor(virtual_sensor *sensor); virtual_sensors get_active_virtual_sensors(void); - void sort_sensor_events(sensor_event_t *events, unsigned int cnt); + void sort_sensor_events(std::vector< std::pair > &events); public: static csensor_event_dispatcher& get_instance(); bool run(void); diff --git a/src/server/csensor_event_queue.cpp b/src/server/csensor_event_queue.cpp index f75fa42..bc907e1 100644 --- a/src/server/csensor_event_queue.cpp +++ b/src/server/csensor_event_queue.cpp @@ -26,63 +26,31 @@ csensor_event_queue& csensor_event_queue::get_instance() return inst; } -void csensor_event_queue::push(const sensor_event_t &event) -{ - sensor_event_t *new_event = new(std::nothrow) sensor_event_t; - retm_if(!new_event, "Failed to allocate memory"); - *new_event = event; - - push_internal(new_event); -} - -void csensor_event_queue::push(sensor_event_t *event) -{ - push_internal(event); -} - -void csensor_event_queue::push(const sensorhub_event_t &event) -{ - sensorhub_event_t *new_event = new(std::nothrow) sensorhub_event_t; - retm_if(!new_event, "Failed to allocate memory"); - *new_event = event; - - push_internal(new_event); -} - -void csensor_event_queue::push(sensorhub_event_t *event) -{ - push_internal(event); -} - -void csensor_event_queue::push_internal(void *event) +void csensor_event_queue::push_internal(void *event, int length) { lock l(m_mutex); bool wake = m_queue.empty(); if (m_queue.size() >= QUEUE_FULL_SIZE) { ERR("Queue is full, drop it!"); - - unsigned int event_type = *((unsigned int *)(event)); - - if (is_sensorhub_event(event_type)) - delete (sensorhub_event_t *)event; - else - delete (sensor_event_t *)event; + free(event); } else - m_queue.push(event); + m_queue.push(std::pair(event, length)); if (wake) m_cond_var.notify_one(); } -void* csensor_event_queue::pop(void) +void* csensor_event_queue::pop(int *length) { ulock u(m_mutex); while (m_queue.empty()) m_cond_var.wait(u); - void* event = m_queue.top(); + std::pair event = m_queue.top(); m_queue.pop(); - return event; + + *length = event.second; + return event.first; } diff --git a/src/server/csensor_event_queue.h b/src/server/csensor_event_queue.h index 8ae00d2..a72ca4a 100644 --- a/src/server/csensor_event_queue.h +++ b/src/server/csensor_event_queue.h @@ -20,6 +20,8 @@ #if !defined(_CSENSOR_EVENT_QUEUE_CLASS_H_) #define _CSENSOR_EVENT_QUEUE_CLASS_H_ #include +#include +#include #include #include #include @@ -34,9 +36,9 @@ private: class compare { public: - bool operator() (void* &v1,void *&v2) { - sensor_event_t *e1 = (sensor_event_t *)v1; - sensor_event_t *e2 = (sensor_event_t *)v2; + bool operator() (std::pair v1, std::pair v2) { + sensor_event_t *e1 = (sensor_event_t *)v1.first; + sensor_event_t *e2 = (sensor_event_t *)v2.first; bool prioritize_e1 = true; bool prioritize_e2 = true; @@ -72,7 +74,7 @@ private: } }; - std::priority_queue, compare> m_queue; + std::priority_queue, std::vector>, compare> m_queue; std::mutex m_mutex; std::condition_variable m_cond_var; @@ -84,15 +86,27 @@ private: ~csensor_event_queue() {}; csensor_event_queue(const csensor_event_queue &) {}; csensor_event_queue& operator=(const csensor_event_queue &); - void push_internal(void *event); + void push_internal(void *event, int length); public: static csensor_event_queue& get_instance(); - void push(const sensor_event_t &event); - void push(sensor_event_t *event); - void push(const sensorhub_event_t &event); - void push(sensorhub_event_t *event); - - void* pop(void); + template void push(const T &event); + template void push(T *event); + void* pop(int *length); }; +template +void csensor_event_queue::push(const T &event) +{ + void *new_event = malloc(sizeof(event)); + if (!new_event) + return; + memcpy(new_event, &event, sizeof(event)); + push_internal(new_event, sizeof(event)); +} + +template +void csensor_event_queue::push(T *event) +{ + push_internal(event, sizeof(event)); +} #endif -- 2.7.4