#include <sensor-log-private.h>
#include <sensor-types-private.h>
#include <command-types.h>
+#include <lock.h>
+
+#include <unordered_map>
using namespace sensor;
-class listener_handler : public ipc::channel_handler
-{
-public:
- listener_handler(sensor_listener *listener)
- : m_listener(listener)
- {
- evt_handler[0] = evt_handler[1] = evt_handler[2] = evt_handler[3] = NULL;
- }
+struct listener_info {
+ sensor_listener *listener;
+ cmutex lock;
+};
- void connected(ipc::channel *ch) {}
- void disconnected(ipc::channel *ch)
- {
- /* If channel->disconnect() is not explicitly called,
- * listener will be restored */
- if (m_listener)
- m_listener->restore();
- }
+static std::unordered_map<int, listener_info> listeners;
+static int listener_id;
- void disconnect(void)
- {
- m_listener = NULL;
- }
+static gboolean listener_handler(GIOChannel *ch, GIOCondition condition, gpointer data)
+{
+ int id = (long)data;
+ Autolock(listeners[id].lock);
- void read(ipc::channel *ch, ipc::message &msg)
- {
- ipc::channel_handler *handler = NULL;
- switch (msg.header()->type) {
- case CMD_LISTENER_EVENT:
- handler = evt_handler[0];
- if (handler)
- handler->read(ch, msg);
- break;
- case CMD_LISTENER_ACC_EVENT:
- handler = evt_handler[1];
- if (handler)
- handler->read(ch, msg);
- break;
- case CMD_LISTENER_SET_ATTR_INT:
- handler = evt_handler[2];
- if (handler)
- handler->read(ch, msg);
- break;
- case CMD_LISTENER_SET_ATTR_STR:
- handler = evt_handler[3];
- if (handler)
- handler->read(ch, msg);
- break;
- case CMD_LISTENER_CONNECTED:
- // Do nothing
- break;
- default:
- _W("Invalid command message");
- }
+ sensor_listener *listener = listeners[id].listener;
+ if (!listener) {
+ listeners.erase(id);
+ return false;
}
- void set_handler(int num, ipc::channel_handler* handler) {
- evt_handler[num] = handler;
+ unsigned int cond = (unsigned int)condition;
+ if (cond & (G_IO_HUP)) {
+ listener->restore();
+ return false;
}
- void read_complete(ipc::channel *ch) {}
- void error_caught(ipc::channel *ch, int error) {}
+ ipc::message msg;
+ bool ret = listener->read(msg);
+ if (!ret)
+ return false;
-private:
- ipc::channel_handler *evt_handler[4];
- sensor_listener *m_listener;
-};
+ listener->handle(msg);
-sensor_listener::sensor_listener(sensor_t sensor)
-: m_id(0)
-, m_sensor(reinterpret_cast<sensor_info *>(sensor))
-, m_cmd_channel(NULL)
-, m_evt_channel(NULL)
-, m_handler(NULL)
-, m_evt_handler(NULL)
-, m_acc_handler(NULL)
-, m_attr_int_changed_handler(NULL)
-, m_attr_str_changed_handler(NULL)
-, m_connected(false)
-, m_started(false)
-{
- init();
+ return true;
}
-sensor_listener::sensor_listener(sensor_t sensor, ipc::event_loop *loop)
+sensor_listener::sensor_listener(sensor_t sensor, GMainLoop *loop)
: m_id(0)
+, l_id(listener_id++)
, m_sensor(reinterpret_cast<sensor_info *>(sensor))
-, m_cmd_channel(NULL)
-, m_evt_channel(NULL)
-, m_handler(NULL)
+, m_cmd_socket(NULL)
+, m_evt_socket(NULL)
, m_evt_handler(NULL)
, m_acc_handler(NULL)
, m_attr_int_changed_handler(NULL)
bool sensor_listener::init(void)
{
- m_handler = new(std::nothrow) listener_handler(this);
- if (!m_handler) {
- _E("Failed to allocate memory");
+ if (!connect())
return false;
- }
- if (!connect()) {
- delete m_handler;
- m_handler = NULL;
- return false;
- }
+ listeners[l_id].listener = this;
return true;
}
void sensor_listener::deinit(void)
{
- AUTOLOCK(lock);
+ Autolock(listeners[l_id].lock);
_D("Deinitializing..");
stop();
disconnect();
unset_attribute_int_changed_handler();
unset_attribute_str_changed_handler();
- m_handler->disconnect();
- m_loop->add_channel_handler_release_list(m_handler);
- m_handler = NULL;
+ listeners[l_id].listener = NULL;
m_attributes_int.clear();
m_attributes_str.clear();
void sensor_listener::restore(void)
{
- if (lock.try_lock())
- return;
+ Autolock(listeners[l_id].lock);
- m_cmd_channel->disconnect();
- delete m_cmd_channel;
- m_cmd_channel = NULL;
+ disconnect();
retm_if(!connect(), "Failed to restore listener");
set_attribute(SENSORD_ATTRIBUTE_PAUSE_POLICY, m_attributes_int[SENSORD_ATTRIBUTE_PAUSE_POLICY]);
_D("Restored listener[%d]", get_id());
- lock.unlock();
}
bool sensor_listener::connect(void)
{
- m_cmd_channel = new(std::nothrow) ipc::channel();
- retvm_if(!m_cmd_channel, false, "Failed to allocate memory");
- m_cmd_channel->connect(NULL, NULL, true);
-
- m_evt_channel = new(std::nothrow) ipc::channel();
- retvm_if(!m_evt_channel, false, "Failed to allocate memory");
- m_evt_channel->connect(m_handler, m_loop, false);
-
+ GIOChannel *ch = NULL;
ipc::message msg;
ipc::message reply;
cmd_listener_connect_t buf = {0, };
+ m_cmd_socket = new(std::nothrow) ipc::socket();
+ retvm_if(!m_cmd_socket, false, "Failed to allocate memory");
+
+ if (!m_cmd_socket->create(SENSOR_CHANNEL_PATH))
+ goto fail;
+
+ if (!m_cmd_socket->connect())
+ goto fail;
+
+ m_evt_socket = new(std::nothrow) ipc::socket();
+ if (!m_evt_socket) {
+ _E("Failed to allocate memory");
+ goto fail;
+ }
+
+ if (!m_evt_socket->create(SENSOR_CHANNEL_PATH))
+ goto fail;
+
+ if (!m_evt_socket->connect())
+ goto fail;
+
memcpy(buf.sensor, m_sensor->get_uri().c_str(), m_sensor->get_uri().size());
msg.set_type(CMD_LISTENER_CONNECT);
msg.enclose((const char *)&buf, sizeof(buf));
- m_evt_channel->send_sync(msg);
+ send(m_evt_socket, msg);
- m_evt_channel->read_sync(reply);
+ read(m_evt_socket, reply);
reply.disclose((char *)&buf, sizeof(buf));
m_id = buf.listener_id;
m_connected.store(true);
- m_evt_channel->bind();
+ ch = g_io_channel_unix_new(m_evt_socket->get_fd());
+ retvm_if(!ch, false, "Failed to create g_io_channel_unix_new");
+
+ g_src = g_io_create_watch(ch, (GIOCondition) (ipc::EVENT_IN | ipc::EVENT_HUP | ipc::EVENT_NVAL));
+ g_io_channel_unref(ch);
+ if (!g_src) {
+ _E("Failed to create g_io_create_watch");
+ goto fail;
+ }
+
+ g_source_set_callback(g_src, (GSourceFunc) listener_handler, (gpointer)l_id, NULL);
+ g_source_attach(g_src, g_main_loop_get_context(m_loop));
+ g_source_unref(g_src);
_I("Connected listener[%d] with sensor[%s]", get_id(), m_sensor->get_uri().c_str());
return true;
+
+fail:
+ delete m_evt_socket;
+ m_evt_socket = NULL;
+
+ delete m_cmd_socket;
+ m_cmd_socket = NULL;
+
+ return false;
}
void sensor_listener::disconnect(void)
_D("Disconnecting..");
- m_loop->add_channel_release_queue(m_evt_channel);
- m_evt_channel = NULL;
+ if (g_src && !g_source_is_destroyed(g_src)) {
+ g_source_destroy(g_src);
+ g_src = NULL;
+ }
+
+ delete m_evt_socket;
+ m_evt_socket = NULL;
- m_cmd_channel->disconnect();
- delete m_cmd_channel;
- m_cmd_channel = NULL;
+ delete m_cmd_socket;
+ m_cmd_socket = NULL;
_I("Disconnected[%d]", get_id());
}
void sensor_listener::set_event_handler(ipc::channel_handler *handler)
{
- m_handler->set_handler(0, handler);
- if (m_evt_handler)
- m_loop->add_channel_handler_release_list(m_evt_handler);
+ delete m_evt_handler;
m_evt_handler = handler;
}
void sensor_listener::unset_event_handler(void)
{
- if (m_evt_handler) {
- m_handler->set_handler(0, NULL);
- m_loop->add_channel_handler_release_list(m_evt_handler);
- m_evt_handler = NULL;
- }
+ delete m_evt_handler;
+ m_evt_handler = NULL;
}
ipc::channel_handler *sensor_listener::get_accuracy_handler(void)
void sensor_listener::set_accuracy_handler(ipc::channel_handler *handler)
{
- m_handler->set_handler(1, handler);
- if (m_acc_handler)
- m_loop->add_channel_handler_release_list(m_acc_handler);
+ delete m_acc_handler;
m_acc_handler = handler;
}
void sensor_listener::unset_accuracy_handler(void)
{
- if (m_acc_handler) {
- m_handler->set_handler(1, NULL);
- m_loop->add_channel_handler_release_list(m_acc_handler);
- m_acc_handler = NULL;
- }
+ delete m_acc_handler;
+ m_acc_handler = NULL;
}
ipc::channel_handler *sensor_listener::get_attribute_int_changed_handler(void)
void sensor_listener::set_attribute_int_changed_handler(ipc::channel_handler *handler)
{
- m_handler->set_handler(2, handler);
- if (m_attr_int_changed_handler)
- m_loop->add_channel_handler_release_list(m_attr_int_changed_handler);
+ delete m_attr_int_changed_handler;
m_attr_int_changed_handler = handler;
}
void sensor_listener::unset_attribute_int_changed_handler(void)
{
- if (m_attr_int_changed_handler) {
- m_handler->set_handler(2, NULL);
- m_loop->add_channel_handler_release_list(m_attr_int_changed_handler);
- m_attr_int_changed_handler = NULL;
- }
+ delete m_attr_int_changed_handler;
+ m_attr_int_changed_handler = NULL;
}
ipc::channel_handler *sensor_listener::get_attribute_str_changed_handler(void)
void sensor_listener::set_attribute_str_changed_handler(ipc::channel_handler *handler)
{
- m_handler->set_handler(3, handler);
- if (m_attr_str_changed_handler)
- m_loop->add_channel_handler_release_list(m_attr_str_changed_handler);
+ delete m_attr_str_changed_handler;
m_attr_str_changed_handler = handler;
}
void sensor_listener::unset_attribute_str_changed_handler(void)
{
- if (m_attr_str_changed_handler) {
- m_handler->set_handler(3, NULL);
- m_loop->add_channel_handler_release_list(m_attr_str_changed_handler);
- m_attr_str_changed_handler = NULL;
- }
+ delete m_attr_str_changed_handler;
+ m_attr_str_changed_handler = NULL;
}
int sensor_listener::start(void)
ipc::message reply;
cmd_listener_start_t buf = {0, };
- retvm_if(!m_cmd_channel, -EINVAL, "Failed to connect to server");
+ retvm_if(!m_cmd_socket, -EINVAL, "Failed to connect to server");
buf.listener_id = m_id;
msg.set_type(CMD_LISTENER_START);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(msg);
- m_cmd_channel->read_sync(reply);
+ send(m_cmd_socket, msg);
+ read(m_cmd_socket, reply);
if (reply.header()->err < 0) {
_E("Failed to start listener[%d], sensor[%s]", get_id(), m_sensor->get_uri().c_str());
ipc::message reply;
cmd_listener_stop_t buf = {0, };
- retvm_if(!m_cmd_channel, -EINVAL, "Failed to connect to server");
+ retvm_if(!m_cmd_socket, -EINVAL, "Failed to connect to server");
retvm_if(!m_started.load(), -EAGAIN, "Already stopped");
buf.listener_id = m_id;
msg.set_type(CMD_LISTENER_STOP);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(msg);
- m_cmd_channel->read_sync(reply);
+ send(m_cmd_socket, msg);
+ read(m_cmd_socket, reply);
if (reply.header()->err < 0) {
_E("Failed to stop listener[%d]", get_id());
ipc::message reply;
cmd_listener_attr_int_t buf = {0, };
- retvm_if(!m_cmd_channel, -EIO, "Failed to connect to server");
+ retvm_if(!m_cmd_socket, -EIO, "Failed to connect to server");
buf.listener_id = m_id;
buf.attribute = attribute;
msg.set_type(CMD_LISTENER_SET_ATTR_INT);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(msg);
- m_cmd_channel->read_sync(reply);
+ send(m_cmd_socket, msg);
+ read(m_cmd_socket, reply);
if (reply.header()->err < 0)
return reply.header()->err;
ipc::message reply;
cmd_listener_attr_int_t buf = {0, };
- retvm_if(!m_cmd_channel, -EIO, "Failed to connect to server");
+ retvm_if(!m_cmd_socket, -EIO, "Failed to connect to server");
buf.listener_id = m_id;
buf.attribute = attribute;
msg.set_type(CMD_LISTENER_GET_ATTR_INT);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(msg);
- m_cmd_channel->read_sync(reply);
+ send(m_cmd_socket, msg);
+ read(m_cmd_socket, reply);
if (reply.header()->err < 0) {
return reply.header()->err;
void sensor_listener::update_attribute(int attribute, int value)
{
- AUTOLOCK(lock);
+ Autolock(listeners[l_id].lock);
m_attributes_int[attribute] = value;
_I("Update_attribute(int) listener[%d] attribute[%d] value[%d] attributes size[%d]", get_id(), attribute, value, m_attributes_int.size());
}
cmd_listener_attr_str_t *buf;
size_t size;
- retvm_if(!m_cmd_channel, -EIO, "Failed to connect to server");
+ retvm_if(!m_cmd_socket, -EIO, "Failed to connect to server");
size = sizeof(cmd_listener_attr_str_t) + len;
msg.enclose((char *)buf, size);
- m_cmd_channel->send_sync(msg);
- m_cmd_channel->read_sync(reply);
+ send(m_cmd_socket, msg);
+ read(m_cmd_socket, reply);
/* Message memory is released automatically after sending message,
so it doesn't need to free memory */
msg.set_type(CMD_LISTENER_GET_ATTR_STR);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(msg);
- m_cmd_channel->read_sync(reply);
+ send(m_cmd_socket, msg);
+ read(m_cmd_socket, reply);
if (reply.header()->err < 0) {
return reply.header()->err;
}
void sensor_listener::update_attribute(int attribute, const char *value, int len)
{
- AUTOLOCK(lock);
+ Autolock(listeners[l_id].lock);
m_attributes_str[attribute].clear();
m_attributes_str[attribute].insert(m_attributes_str[attribute].begin(), value, value + len);
_I("Update_attribute(str) listener[%d] attribute[%d] value[%s] attributes size[%zu]", get_id(), attribute, value, m_attributes_int.size());
ipc::message reply;
cmd_listener_get_data_t buf = {0, };
- retvm_if(!m_cmd_channel, -EIO, "Failed to connect to server");
+ retvm_if(!m_cmd_socket, -EIO, "Failed to connect to server");
buf.listener_id = m_id;
msg.set_type(CMD_LISTENER_GET_DATA);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(msg);
- m_cmd_channel->read_sync(reply);
+ send(m_cmd_socket, msg);
+ read(m_cmd_socket, reply);
if (reply.header()->err < 0) {
return OP_ERROR;
ipc::message reply;
cmd_listener_get_data_list_t buf = {0, };
- retvm_if(!m_cmd_channel, -EIO, "Failed to connect to server");
+ retvm_if(!m_cmd_socket, -EIO, "Failed to connect to server");
buf.listener_id = m_id;
msg.set_type(CMD_LISTENER_GET_DATA_LIST);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(msg);
- m_cmd_channel->read_sync(reply);
+ send(m_cmd_socket, msg);
+ read(m_cmd_socket, reply);
if (reply.header()->err < 0) {
return reply.header()->err;
delete [] reply_buf;
return OP_SUCCESS;
}
+
+
+bool sensor_listener::read(ipc::message &msg)
+{
+ return read(m_evt_socket, msg);
+}
+
+bool sensor_listener::read(ipc::socket *m_socket, ipc::message &msg)
+{
+ Autolock(listeners[l_id].lock);
+ if (!m_socket) {
+ _E("Socket is not connected");
+ return false;
+ }
+
+ ipc::message_header header;
+ ssize_t size = 0;
+ char buf[MAX_MSG_CAPACITY];
+
+ /* header */
+ size = m_socket->recv(&header, sizeof(ipc::message_header), false);
+ if (size <= 0)
+ return false;
+
+ // check error from header
+ if (header.err != 0) {
+ msg.header()->err = header.err;
+ return false;
+ }
+
+ /* body */
+ if (header.length >= MAX_MSG_CAPACITY) {
+ _E("header.length error %u", header.length);
+ return false;
+ }
+
+ if (header.length > 0) {
+ size = m_socket->recv(&buf, header.length, false);
+ if (size <= 0)
+ return false;
+ }
+
+ buf[header.length] = '\0';
+ msg.enclose(reinterpret_cast<const void *>(buf), header.length);
+ msg.set_type(header.type);
+ msg.header()->err = header.err;
+
+ return true;
+}
+
+bool sensor_listener::send(ipc::socket *m_socket, ipc::message &msg)
+{
+ Autolock(listeners[l_id].lock);
+ if (!m_socket) {
+ _E("Socket is not connected");
+ return false;
+ }
+
+ retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size());
+
+ ssize_t size = 0;
+ char *buf = msg.body();
+
+ /* header */
+ size = m_socket->send(reinterpret_cast<void *>(msg.header()),
+ sizeof(ipc::message_header), true);
+ retvm_if(size <= 0, false, "Failed to send header");
+
+ /* if body size is zero, skip to send body message */
+ retv_if(msg.size() == 0, true);
+
+ /* body */
+ size = m_socket->send(buf, msg.size(), true);
+ retvm_if(size <= 0, false, "Failed to send body");
+
+ return true;
+}
+
+void sensor_listener::handle(ipc::message &msg)
+{
+ ipc::channel_handler *handler = NULL;
+ switch (msg.header()->type) {
+ case CMD_LISTENER_EVENT:
+ handler = m_evt_handler;
+ break;
+ case CMD_LISTENER_ACC_EVENT:
+ handler = m_acc_handler;
+ break;
+ case CMD_LISTENER_SET_ATTR_INT:
+ handler = m_attr_int_changed_handler;
+ break;
+ case CMD_LISTENER_SET_ATTR_STR:
+ handler = m_attr_str_changed_handler;
+ break;
+ case CMD_LISTENER_CONNECTED:
+ // Do nothing
+ break;
+ default:
+ _W("Invalid command message");
+ }
+
+ if (handler)
+ handler->read(NULL, msg);
+}