#include <stdint.h>
#include <unistd.h>
#include <memory>
+#include <algorithm>
#include "sensor_log.h"
#include "channel_event_handler.h"
class send_event_handler : public event_handler
{
public:
- send_event_handler(channel *ch, message *msg)
+ send_event_handler(channel *ch, std::shared_ptr<message> msg)
: m_ch(ch)
, m_msg(msg)
{ }
bool handle(int fd, event_condition condition)
{
- if (!m_ch || !m_ch->is_connected())
+ if (!m_ch) {
return false;
+ }
- if (condition & (EVENT_IN | EVENT_HUP))
+ m_ch->remove_pending_event_id(m_event_id);
+
+ if (!m_ch->is_connected()) {
return false;
+ }
- if (!m_ch->send_sync(m_msg))
+ if (condition & (EVENT_IN | EVENT_HUP)) {
return false;
+ }
- if (m_msg)
- m_msg->unref();
+ if (!m_ch->send_sync(*m_msg)) {
+ return false;
+ }
return false;
}
private:
channel *m_ch;
- message *m_msg;
+ std::shared_ptr<message> m_msg;
};
class read_event_handler : public event_handler
bool handle(int fd, event_condition condition)
{
- message msg;
+ if (!m_ch) {
+ return false;
+ }
- if (!m_ch || !m_ch->is_connected())
+ m_ch->remove_pending_event_id(m_event_id);
+
+ if (!m_ch->is_connected()) {
return false;
+ }
- if (condition & (EVENT_OUT | EVENT_HUP))
+ if (condition & (EVENT_OUT | EVENT_HUP)) {
return false;
+ }
- if (!m_ch->read_sync(msg, false))
+ message msg;
+ if (!m_ch->read_sync(msg, false)) {
return false;
+ }
return false;
}
, m_loop(NULL)
, m_connected(false)
{
- _D("Created");
+ _D("Create[%p]", this);
}
channel::~channel()
{
- _D("Destroyed[%llu]", m_event_id);
- disconnect();
+ _D("Destroy[%p]", this);
+ if (is_connected()) {
+ disconnect();
+ }
}
uint64_t channel::bind(void)
(EVENT_IN | EVENT_HUP | EVENT_NVAL),
dynamic_cast<channel_event_handler *>(m_handler));
- _D("Bound[%llu]", m_event_id);
+ _D("Bind channel[%p] : handler[%p] event_id[%llu]", this, m_handler, m_event_id);
return m_event_id;
}
bind(handler, loop, loop_bind);
- _D("Connected[%llu]", m_event_id);
+ _D("Connect channel[%p] : event id[%llu]", this, m_event_id);
return m_event_id;
}
void channel::disconnect(void)
{
+ AUTOLOCK(m_cmutex);
if (!is_connected()) {
- _D("Channel is not connected");
+ _D("Channel[%p] is not connected", this);
return;
}
m_connected.store(false);
- _D("Disconnecting..[%llu]", m_event_id);
+ _D("Disconnect channel[%p]", this);
if (m_handler) {
+ _D("Disconnect channel[%p] handler[%p]", this, m_handler);
m_handler->disconnected(this);
m_handler = NULL;
}
if (m_loop) {
- _D("Remove event[%llu]", m_event_id);
+ for(auto id : m_pending_event_id) {
+ _D("Remove channel[%p] pending event id[%llu]", this, id);
+ m_loop->remove_event(id, true);
+ }
+ _D("Remove channel[%p] event[%llu]",this, m_event_id);
m_loop->remove_event(m_event_id, true);
- m_loop = NULL;
m_event_id = 0;
}
if (m_socket) {
- _D("Release socket[%d]", m_socket->get_fd());
+ _D("Release channel[%p] socket[%d]", this, m_socket->get_fd());
delete m_socket;
m_socket = NULL;
}
-
- _D("Disconnected");
+ _D("Channel[%p] is disconnected", this);
}
-bool channel::send(message *msg)
+bool channel::send(std::shared_ptr<message> msg)
{
int retry_cnt = 0;
int cur_buffer_size = 0;
send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg);
retvm_if(!handler, false, "Failed to allocate memory");
- msg->ref();
-
- m_loop->add_event(m_socket->get_fd(),
- (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler);
+ uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_OUT | EVENT_HUP | EVENT_NVAL), handler);
+ if (event_id == 0) {
+ _D("Failed to add send event handler");
+ delete handler;
+ return false;
+ }
+ m_pending_event_id.push_back(event_id);
return true;
}
-bool channel::send_sync(message *msg)
+bool channel::send_sync(message &msg)
{
- retvm_if(!msg, false, "Invalid message");
- retvm_if(msg->size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg->size());
+ AUTOLOCK(m_cmutex);
+ if (!is_connected()) {
+ _D("Channel is not connected");
+ return false;
+ }
+
+ retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size());
ssize_t size = 0;
- char *buf = msg->body();
+ char *buf = msg.body();
/* header */
- size = m_socket->send(reinterpret_cast<void *>(msg->header()),
+ size = m_socket->send(reinterpret_cast<void *>(msg.header()),
sizeof(message_header), true);
retvm_if(size <= 0, false, "Failed to send header");
/* if body size is zero, skip to send body message */
- retv_if(msg->size() == 0, true);
+ retv_if(msg.size() == 0, true);
/* body */
- size = m_socket->send(buf, msg->size(), true);
+ size = m_socket->send(buf, msg.size(), true);
retvm_if(size <= 0, false, "Failed to send body");
return true;
read_event_handler *handler = new(std::nothrow) read_event_handler(this);
retvm_if(!handler, false, "Failed to allocate memory");
- m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
+ uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
+ if (event_id == 0) {
+ _D("Failed to add read event handler");
+ delete handler;
+ return false;
+ }
+ m_pending_event_id.push_back(event_id);
return true;
}
bool channel::read_sync(message &msg, bool select)
{
+ AUTOLOCK(m_cmutex);
+ if (!is_connected()) {
+ _D("Channel is not connected");
+ return false;
+ }
+
message_header header;
ssize_t size = 0;
char buf[MAX_MSG_CAPACITY];
/* header */
size = m_socket->recv(&header, sizeof(message_header), select);
- retv_if(size <= 0, false);
-
+ if (size <= 0) {
+ if (size == -1) {
+ disconnect();
+ }
+ return false;
+ }
/* check error from header */
if (m_handler && header.err != 0) {
m_handler->error_caught(this, header.err);
if (header.length > 0) {
size = m_socket->recv(&buf, header.length, select);
- retv_if(size <= 0, false);
+ if (size <= 0) {
+ if (size == -1) {
+ disconnect();
+ }
+ return false;
+ }
}
buf[header.length] = '\0';
{
return m_fd;
}
+
+void channel::remove_pending_event_id(uint64_t id)
+{
+ auto it = std::find(m_pending_event_id.begin(), m_pending_event_id.end(), id);
+ if (it != m_pending_event_id.end()) {
+ m_pending_event_id.erase(it);
+ }
+}