Disconnect channel when peer performs shutdown
[platform/core/system/sensord.git] / src / shared / channel.cpp
index e407aee..020d5fb 100644 (file)
 #include <stdint.h>
 #include <unistd.h>
 #include <memory>
+#include <algorithm>
 
 #include "sensor_log.h"
 #include "channel_event_handler.h"
 
-#define SYSTEMD_SOCK_BUF_SIZE 40000
+#define SYSTEMD_SOCK_BUF_SIZE (128*1024)
 
 using namespace ipc;
 
 class send_event_handler : public event_handler
 {
 public:
-       send_event_handler(channel *ch, message *msg)
+       send_event_handler(channel *ch, std::shared_ptr<message> msg)
        : m_ch(ch)
        , m_msg(msg)
        { }
 
        bool handle(int fd, event_condition condition)
        {
-               if (!m_ch || !m_ch->is_connected())
+               if (!m_ch) {
                        return false;
+               }
 
-               if (condition & (EVENT_IN | EVENT_HUP))
+               m_ch->remove_pending_event_id(m_event_id);
+
+               if (!m_ch->is_connected()) {
                        return false;
+               }
 
-               if (!m_ch->send_sync(m_msg))
+               if (condition & (EVENT_IN | EVENT_HUP)) {
                        return false;
+               }
 
-               if (m_msg)
-                       m_msg->unref();
+               if (!m_ch->send_sync(*m_msg)) {
+                       return false;
+               }
 
                return false;
        }
 
 private:
        channel *m_ch;
-       message *m_msg;
+       std::shared_ptr<message> m_msg;
 };
 
 class read_event_handler : public event_handler
@@ -69,16 +76,24 @@ public:
 
        bool handle(int fd, event_condition condition)
        {
-               message msg;
+               if (!m_ch) {
+                       return false;
+               }
+
+               m_ch->remove_pending_event_id(m_event_id);
 
-               if (!m_ch || !m_ch->is_connected())
+               if (!m_ch->is_connected()) {
                        return false;
+               }
 
-               if (condition & (EVENT_OUT | EVENT_HUP))
+               if (condition & (EVENT_OUT | EVENT_HUP)) {
                        return false;
+               }
 
-               if (!m_ch->read_sync(msg, false))
+               message msg;
+               if (!m_ch->read_sync(msg, false)) {
                        return false;
+               }
 
                return false;
        }
@@ -95,14 +110,29 @@ channel::channel(socket *sock)
 , m_loop(NULL)
 , m_connected(false)
 {
+       _D("Create[%p]", this);
 }
 
 channel::~channel()
 {
-       /* disconnect() should not be called here */
+       _D("Destroy[%p]", this);
+       if (is_connected()) {
+               disconnect();
+       }
+}
+
+uint64_t channel::bind(void)
+{
+       retv_if(!m_loop, 0);
+       m_event_id = m_loop->add_event(m_socket->get_fd(),
+                       (EVENT_IN | EVENT_HUP | EVENT_NVAL),
+                       dynamic_cast<channel_event_handler *>(m_handler));
+
+       _D("Bind channel[%p] : handler[%p] event_id[%llu]", this, m_handler, m_event_id);
+       return m_event_id;
 }
 
-void channel::bind(channel_handler *handler, event_loop *loop)
+uint64_t channel::bind(channel_handler *handler, event_loop *loop, bool loop_bind)
 {
        m_handler = handler;
        m_loop = loop;
@@ -110,82 +140,114 @@ void channel::bind(channel_handler *handler, event_loop *loop)
 
        if (m_handler)
                m_handler->connected(this);
-}
 
-void channel::bind(void)
-{
-       ret_if(!m_loop);
-       m_event_id = m_loop->add_event(m_socket->get_fd(),
-                       (EVENT_IN | EVENT_HUP | EVENT_NVAL),
-                       dynamic_cast<channel_event_handler *>(m_handler));
+       if (loop_bind)
+               bind();
+
+       return m_event_id;
 }
 
-bool channel::connect(channel_handler *handler, event_loop *loop)
+uint64_t channel::connect(channel_handler *handler, event_loop *loop, bool loop_bind)
 {
        if (!m_socket->connect())
                return false;
 
-       bind(handler, loop);
-       return true;
+       bind(handler, loop, loop_bind);
+
+       _D("Connect channel[%p] : event id[%llu]", this, m_event_id);
+       return m_event_id;
 }
 
 void channel::disconnect(void)
 {
-       ret_if(!is_connected());
+       AUTOLOCK(m_cmutex);
+       if (!is_connected()) {
+               _D("Channel[%p] is not connected", this);
+               return;
+       }
+
        m_connected.store(false);
 
+       _D("Disconnect channel[%p]", this);
+
        if (m_handler) {
+               _D("Disconnect channel[%p] handler[%p]", this, m_handler);
                m_handler->disconnected(this);
                m_handler = NULL;
        }
 
        if (m_loop) {
+               for(auto id : m_pending_event_id) {
+                       _D("Remove channel[%p] pending event id[%llu]", this, id);
+                       m_loop->remove_event(id, true);
+               }
+               _D("Remove channel[%p] event[%llu]",this, m_event_id);
                m_loop->remove_event(m_event_id, true);
-               m_loop = NULL;
                m_event_id = 0;
        }
 
        if (m_socket) {
+               _D("Release channel[%p] socket[%d]", this, m_socket->get_fd());
                delete m_socket;
                m_socket = NULL;
        }
+       _D("Channel[%p] is disconnected", this);
 }
 
-bool channel::send(message *msg)
+bool channel::send(std::shared_ptr<message> msg)
 {
+       int retry_cnt = 0;
+       int cur_buffer_size = 0;
+
        retv_if(!m_loop, false);
 
-       /* TODO: check buffer size(is there any linux api for this?) */
-       int cur_buffer_size = m_socket->get_current_buffer_size();
-       retvm_if(cur_buffer_size > SYSTEMD_SOCK_BUF_SIZE, false, "Failed to send data");
+       while (retry_cnt < 3) {
+               cur_buffer_size = m_socket->get_current_buffer_size();
+               if (cur_buffer_size <= SYSTEMD_SOCK_BUF_SIZE)
+                       break;
+               usleep(3000);
+               retry_cnt++;
+       }
+       retvm_if(retry_cnt >= 3, false, "Socket buffer[%d] is exceeded", cur_buffer_size);
 
        send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg);
        retvm_if(!handler, false, "Failed to allocate memory");
 
-       msg->ref();
-
-       m_loop->add_event(m_socket->get_fd(),
-                       (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler);
+       uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_OUT | EVENT_HUP | EVENT_NVAL), handler);
+       if (event_id == 0) {
+               _D("Failed to add send event handler");
+               delete handler;
+               return false;
+       }
 
+       m_pending_event_id.push_back(event_id);
        return true;
 }
 
-bool channel::send_sync(message *msg)
+bool channel::send_sync(message &msg)
 {
-       retv_if(!msg, false);
+       AUTOLOCK(m_cmutex);
+       if (!is_connected()) {
+               _D("Channel is not connected");
+               return false;
+       }
+
+       retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size());
 
        ssize_t size = 0;
-       char *buf = msg->body();
+       char *buf = msg.body();
 
        /* header */
-       size = m_socket->send(reinterpret_cast<void *>(msg->header()),
-                                  sizeof(message_header), true);
-       retv_if(size <= 0, false);
-       retv_if(msg->size() <= 0, true);
+       size = m_socket->send(reinterpret_cast<void *>(msg.header()),
+           sizeof(message_header), true);
+       retvm_if(size <= 0, false, "Failed to send header");
+
+       /* if body size is zero, skip to send body message */
+       retv_if(msg.size() == 0, true);
 
        /* body */
-       size = m_socket->send(buf, msg->size(), true);
-       retv_if(size <= 0, false);
+       size = m_socket->send(buf, msg.size(), true);
+       retvm_if(size <= 0, false, "Failed to send body");
 
        return true;
 }
@@ -197,21 +259,37 @@ bool channel::read(void)
        read_event_handler *handler = new(std::nothrow) read_event_handler(this);
        retvm_if(!handler, false, "Failed to allocate memory");
 
-       m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
+       uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
+       if (event_id == 0) {
+               _D("Failed to add read event handler");
+               delete handler;
+               return false;
+       }
 
+       m_pending_event_id.push_back(event_id);
        return true;
 }
 
 bool channel::read_sync(message &msg, bool select)
 {
+       AUTOLOCK(m_cmutex);
+       if (!is_connected()) {
+               _D("Channel is not connected");
+               return false;
+       }
+
        message_header header;
        ssize_t size = 0;
        char buf[MAX_MSG_CAPACITY];
 
        /* header */
        size = m_socket->recv(&header, sizeof(message_header), select);
-       retv_if(size <= 0, false);
-
+       if (size <= 0) {
+               if (size == -1) {
+                       disconnect();
+               }
+               return false;
+       }
        /* check error from header */
        if (m_handler && header.err != 0) {
                m_handler->error_caught(this, header.err);
@@ -220,9 +298,19 @@ bool channel::read_sync(message &msg, bool select)
        }
 
        /* body */
+       if (header.length >= MAX_MSG_CAPACITY) {
+               _E("header.length error %u", header.length);
+               return false;
+       }
+
        if (header.length > 0) {
                size = m_socket->recv(&buf, header.length, select);
-               retv_if(size <= 0, false);
+               if (size <= 0) {
+                       if (size == -1) {
+                               disconnect();
+                       }
+                       return false;
+               }
        }
 
        buf[header.length] = '\0';
@@ -281,7 +369,10 @@ int channel::get_fd(void) const
        return m_fd;
 }
 
-void channel::set_event_id(uint64_t id)
+void channel::remove_pending_event_id(uint64_t id)
 {
-       m_event_id = id;
+       auto it = std::find(m_pending_event_id.begin(), m_pending_event_id.end(), id);
+       if (it != m_pending_event_id.end()) {
+               m_pending_event_id.erase(it);
+       }
 }