sensord: modify event queue & dispatcher to support new sensor event 27/57627/2
authorkibak.yoon <kibak.yoon@samsung.com>
Thu, 21 Jan 2016 04:35:28 +0000 (13:35 +0900)
committerkibak.yoon <kibak.yoon@samsung.com>
Thu, 21 Jan 2016 08:05:22 +0000 (17:05 +0900)
structures

Change-Id: I4745524f042f7ba1ab3130928e63c33ef8d47a87
Signed-off-by: kibak.yoon <kibak.yoon@samsung.com>
src/server/csensor_event_dispatcher.cpp
src/server/csensor_event_dispatcher.h
src/server/csensor_event_queue.cpp
src/server/csensor_event_queue.h

index 7408520..06eb8a8 100755 (executable)
  */
 
 #include <csensor_event_dispatcher.h>
-#include <sensor_plugin_loader.h>
 #include <sensor_logs.h>
 #include <sf_common.h>
 #include <thread>
-#include <vector>
 
 using std::thread;
 using std::vector;
+using std::pair;
 
 #define MAX_PENDING_CONNECTION 32
 
@@ -130,9 +129,6 @@ void csensor_event_dispatcher::accept_connections(void)
 
 void csensor_event_dispatcher::dispatch_event(void)
 {
-       const int MAX_EVENT_PER_SENSOR = 16;
-       const int MAX_SENSOR_EVENT = 1 + (sensor_plugin_loader::get_instance().get_virtual_sensors().size()
-               * MAX_EVENT_PER_SENSOR);
        const int MAX_SYNTH_PER_SENSOR = 5;
 
        vector<sensor_event_t> v_sensor_events(MAX_SYNTH_PER_SENSOR);
@@ -141,8 +137,8 @@ void csensor_event_dispatcher::dispatch_event(void)
 
        while (true) {
                bool is_hub_event = false;
-
-               void *seed_event = get_event_queue().pop();
+               int seed_event_len = 0;
+               void *seed_event = get_event_queue().pop(&seed_event_len);
                unsigned int event_type = *((unsigned int *)(seed_event));
 
                if (is_sensorhub_event(event_type))
@@ -150,11 +146,10 @@ void csensor_event_dispatcher::dispatch_event(void)
 
                if (is_hub_event) {
                        sensorhub_event_t *sensorhub_event = (sensorhub_event_t *)seed_event;
-                       send_sensor_events(sensorhub_event, 1, true);
+                       send_sensorhub_events(sensorhub_event);
                } else {
-                       sensor_event_t sensor_events[MAX_SENSOR_EVENT];
-                       unsigned int event_cnt = 0;
-                       sensor_events[event_cnt++] = *((sensor_event_t *)seed_event);
+                       vector< pair<void*, int> > sensor_events;
+                       sensor_events.push_back(pair<void*, int>(seed_event, seed_event_len));
 
                        virtual_sensors v_sensors = get_active_virtual_sensors();
 
@@ -166,55 +161,48 @@ void csensor_event_dispatcher::dispatch_event(void)
                                (*it_v_sensor)->synthesize(*((sensor_event_t *)seed_event), v_sensor_events);
                                synthesized_cnt = v_sensor_events.size();
 
-                               for (int i = 0; i < synthesized_cnt; ++i)
-                                       sensor_events[event_cnt++] = v_sensor_events[i];
+                               for (int i = 0; i < synthesized_cnt; ++i) {
+                                       sensor_event_t *v_event = (sensor_event_t*)malloc(sizeof(sensor_event_t));
+                                       if (!v_event) {
+                                               ERR("Failed to allocate memory");
+                                               continue;
+                                       }
+
+                                       memcpy(v_event, &v_sensor_events[i], sizeof(sensor_event_t));
+                                       sensor_events.push_back(pair<void*, int>(v_event, sizeof(sensor_event_t)));
+                               }
 
                                ++it_v_sensor;
                        }
 
-                       sort_sensor_events(sensor_events, event_cnt);
+                       sort_sensor_events(sensor_events);
 
-                       for (unsigned int i = 0; i < event_cnt; ++i) {
-                               if (is_record_event(sensor_events[i].event_type))
-                                       put_last_event(sensor_events[i].event_type, sensor_events[i]);
+                       for (unsigned int i = 0; i < sensor_events.size(); ++i) {
+                               if (is_record_event(((sensor_event_t*)(sensor_events[i].first))->event_type))
+                                       put_last_event(((sensor_event_t*)(sensor_events[i].first))->event_type, *((sensor_event_t*)(sensor_events[i].first)));
                        }
 
-                       send_sensor_events(sensor_events, event_cnt, false);
+                       send_sensor_events(sensor_events);
                }
-
-               if (is_hub_event)
-                       delete (sensorhub_event_t *)seed_event;
-               else
-                       delete (sensor_event_t *)seed_event;
        }
 }
 
 
-void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, bool is_hub_event)
+void csensor_event_dispatcher::send_sensor_events(vector< pair<void*, int> > &events)
 {
        sensor_event_t *sensor_events = NULL;
-       sensorhub_event_t *sensor_hub_events = NULL;
        cclient_info_manager& client_info_manager = get_client_info_manager();
 
        const int RESERVED_CLIENT_CNT = 20;
        static client_id_vec id_vec(RESERVED_CLIENT_CNT);
 
-       if (is_hub_event)
-               sensor_hub_events = (sensorhub_event_t *)events;
-       else
-               sensor_events = (sensor_event_t *)events;
-
-       for (int i = 0; i < event_cnt; ++i) {
+       for (unsigned int i = 0; i < events.size(); ++i) {
                sensor_id_t sensor_id;
                unsigned int event_type;
-
-               if (is_hub_event) {
-                       sensor_id = sensor_hub_events[i].sensor_id;
-                       event_type = sensor_hub_events[i].event_type;
-               } else {
-                       sensor_id = sensor_events[i].sensor_id;
-                       event_type = sensor_events[i].event_type;
-               }
+               sensor_events = (sensor_event_t*)events[i].first;
+               int length = events[i].second;
+               sensor_id = sensor_events->sensor_id;
+               event_type = sensor_events->event_type;
 
                id_vec.clear();
                client_info_manager.get_listener_ids(sensor_id, event_type, id_vec);
@@ -223,17 +211,8 @@ void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, b
 
                while (it_client_id != id_vec.end()) {
                        csocket client_socket;
-                       bool ret;
-
-                       if (!client_info_manager.get_event_socket(*it_client_id, client_socket)) {
-                               ++it_client_id;
-                               continue;
-                       }
-
-                       if (is_hub_event)
-                               ret = (client_socket.send(sensor_hub_events + i, sizeof(sensorhub_event_t)) > 0);
-                       else
-                               ret = (client_socket.send(sensor_events + i, sizeof(sensor_event_t)) > 0);
+                       client_info_manager.get_event_socket(*it_client_id, client_socket);
+                       bool ret = (client_socket.send(sensor_events, length) > 0);
 
                        if (ret)
                                DBG("Event[0x%x] sent to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
@@ -242,9 +221,48 @@ void csensor_event_dispatcher::send_sensor_events(void* events, int event_cnt, b
 
                        ++it_client_id;
                }
+
+               free(sensor_events);
        }
 }
 
+void csensor_event_dispatcher::send_sensorhub_events(void* events)
+{
+       sensorhub_event_t *sensor_hub_events;
+       cclient_info_manager& client_info_manager = get_client_info_manager();
+
+       const int RESERVED_CLIENT_CNT = 20;
+       static client_id_vec id_vec(RESERVED_CLIENT_CNT);
+
+       sensor_hub_events = (sensorhub_event_t *)events;
+
+       sensor_id_t sensor_id;
+       unsigned int event_type;
+
+       sensor_id = sensor_hub_events->sensor_id;
+       event_type = sensor_hub_events->event_type;
+
+       id_vec.clear();
+       client_info_manager.get_listener_ids(sensor_id, event_type, id_vec);
+
+       auto it_client_id = id_vec.begin();
+
+       while (it_client_id != id_vec.end()) {
+               csocket client_socket;
+               client_info_manager.get_event_socket(*it_client_id, client_socket);
+               bool ret = (client_socket.send(sensor_hub_events, sizeof(sensorhub_event_t)) > 0);
+
+               if (ret)
+                       DBG("Event[0x%x] sent to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
+               else
+                       ERR("Failed to send event[0x%x] to %s on socket[%d]", event_type, client_info_manager.get_client_info(*it_client_id), client_socket.get_socket_fd());
+
+               ++it_client_id;
+       }
+
+       free(sensor_hub_events);
+}
+
 cclient_info_manager& csensor_event_dispatcher::get_client_info_manager(void)
 {
        return cclient_info_manager::get_instance();
@@ -296,16 +314,17 @@ virtual_sensors csensor_event_dispatcher::get_active_virtual_sensors(void)
        return m_active_virtual_sensors;
 }
 
-void csensor_event_dispatcher::sort_sensor_events(sensor_event_t *events, unsigned int cnt)
+struct sort_comp {
+       bool operator()(const pair<void*, int> &left, const pair<void*, int> &right) {
+               return ((sensor_event_t*)(left.first))->data.timestamp < ((sensor_event_t*)(right.first))->data.timestamp;
+       }
+};
+
+void csensor_event_dispatcher::sort_sensor_events(vector< pair<void*, int> > &events)
 {
-       std::sort(events, events + cnt,
-               [](const sensor_event_t& a, const sensor_event_t &b)->bool {
-                       return a.data.timestamp < b.data.timestamp;
-               }
-       );
+       std::sort(events.begin(), events.end(), sort_comp());
 }
 
-
 void csensor_event_dispatcher::request_last_event(int client_id, sensor_id_t sensor_id)
 {
        cclient_info_manager& client_info_manager = get_client_info_manager();
index 27131c2..bb40ac2 100755 (executable)
@@ -27,6 +27,8 @@
 #include <virtual_sensor.h>
 #include <unordered_map>
 #include <list>
+#include <vector>
+
 
 typedef std::unordered_map<unsigned int, sensor_event_t> event_type_last_event_map;
 typedef std::list<virtual_sensor *> virtual_sensors;
@@ -51,7 +53,8 @@ private:
        void accept_event_channel(csocket client_socket);
 
        void dispatch_event(void);
-       void send_sensor_events(void* events, int event_cnt, bool is_hub_event);
+       void send_sensor_events(std::vector< std::pair<void*, int> > &events);
+       void send_sensorhub_events(void* events);
        static cclient_info_manager& get_client_info_manager(void);
        static csensor_event_queue& get_event_queue(void);
 
@@ -62,7 +65,7 @@ private:
        bool has_active_virtual_sensor(virtual_sensor *sensor);
        virtual_sensors get_active_virtual_sensors(void);
 
-       void sort_sensor_events(sensor_event_t *events, unsigned int cnt);
+       void sort_sensor_events(std::vector< std::pair<void*, int> > &events);
 public:
        static csensor_event_dispatcher& get_instance();
        bool run(void);
index f75fa42..bc907e1 100644 (file)
@@ -26,63 +26,31 @@ csensor_event_queue& csensor_event_queue::get_instance()
        return inst;
 }
 
-void csensor_event_queue::push(const sensor_event_t &event)
-{
-       sensor_event_t *new_event = new(std::nothrow) sensor_event_t;
-       retm_if(!new_event, "Failed to allocate memory");
-       *new_event = event;
-
-       push_internal(new_event);
-}
-
-void csensor_event_queue::push(sensor_event_t *event)
-{
-       push_internal(event);
-}
-
-void csensor_event_queue::push(const sensorhub_event_t &event)
-{
-       sensorhub_event_t *new_event = new(std::nothrow) sensorhub_event_t;
-       retm_if(!new_event, "Failed to allocate memory");
-       *new_event = event;
-
-       push_internal(new_event);
-}
-
-void csensor_event_queue::push(sensorhub_event_t *event)
-{
-       push_internal(event);
-}
-
-void csensor_event_queue::push_internal(void *event)
+void csensor_event_queue::push_internal(void *event, int length)
 {
        lock l(m_mutex);
        bool wake = m_queue.empty();
 
        if (m_queue.size() >= QUEUE_FULL_SIZE) {
                ERR("Queue is full, drop it!");
-
-               unsigned int event_type = *((unsigned int *)(event));
-
-               if (is_sensorhub_event(event_type))
-                       delete (sensorhub_event_t *)event;
-               else
-                       delete (sensor_event_t *)event;
+               free(event);
        } else
-               m_queue.push(event);
+               m_queue.push(std::pair<void*, int>(event, length));
 
        if (wake)
                m_cond_var.notify_one();
 }
 
-void* csensor_event_queue::pop(void)
+void* csensor_event_queue::pop(int *length)
 {
        ulock u(m_mutex);
        while (m_queue.empty())
                m_cond_var.wait(u);
 
-       void* event = m_queue.top();
+       std::pair<void*, int> event = m_queue.top();
        m_queue.pop();
-       return event;
+
+       *length = event.second;
+       return event.first;
 }
 
index 8ae00d2..a72ca4a 100644 (file)
@@ -20,6 +20,8 @@
 #if !defined(_CSENSOR_EVENT_QUEUE_CLASS_H_)
 #define _CSENSOR_EVENT_QUEUE_CLASS_H_
 #include <sf_common.h>
+#include <cstring>
+#include <utility>
 #include <queue>
 #include <mutex>
 #include <condition_variable>
@@ -34,9 +36,9 @@ private:
 
        class compare {
        public:
-               bool operator() (void* &v1,void *&v2) {
-                       sensor_event_t *e1 = (sensor_event_t *)v1;
-                       sensor_event_t *e2 = (sensor_event_t *)v2;
+               bool operator() (std::pair<void *, int> v1, std::pair<void *, int> v2) {
+                       sensor_event_t *e1 = (sensor_event_t *)v1.first;
+                       sensor_event_t *e2 = (sensor_event_t *)v2.first;
                        bool prioritize_e1 = true;
                        bool prioritize_e2 = true;
 
@@ -72,7 +74,7 @@ private:
                }
        };
 
-       std::priority_queue<void*, std::vector<void*>, compare> m_queue;
+       std::priority_queue<std::pair<void*, int>, std::vector<std::pair<void*, int>>, compare> m_queue;
 
        std::mutex m_mutex;
        std::condition_variable m_cond_var;
@@ -84,15 +86,27 @@ private:
        ~csensor_event_queue() {};
        csensor_event_queue(const csensor_event_queue &) {};
        csensor_event_queue& operator=(const csensor_event_queue &);
-       void push_internal(void *event);
+       void push_internal(void *event, int length);
 public:
        static csensor_event_queue& get_instance();
-       void push(const sensor_event_t &event);
-       void push(sensor_event_t *event);
-       void push(const sensorhub_event_t &event);
-       void push(sensorhub_event_t *event);
-
-       void* pop(void);
+       template<typename T> void push(const T &event);
+       template<typename T> void push(T *event);
+       void* pop(int *length);
 };
 
+template<typename T>
+void csensor_event_queue::push(const T &event)
+{
+       void *new_event = malloc(sizeof(event));
+       if (!new_event)
+               return;
+       memcpy(new_event, &event, sizeof(event));
+       push_internal(new_event, sizeof(event));
+}
+
+template<typename T>
+void csensor_event_queue::push(T *event)
+{
+       push_internal(event, sizeof(event));
+}
 #endif