From 638806e5844cc9401d0792e66caad4e07fb20040 Mon Sep 17 00:00:00 2001 From: Boram Bae Date: Tue, 3 Dec 2019 10:31:09 +0900 Subject: [PATCH] Use share_ptr to send message, Use reference to send_sync message * fix some memory leak Change-Id: I20bbc5e29fa1ed9f801b3f6bb904b49fea857506 Signed-off-by: Boram Bae --- src/client/sensor_internal.cpp | 3 +- src/client/sensor_listener.cpp | 18 +++++----- src/client/sensor_manager.cpp | 6 ++-- src/client/sensor_provider.cpp | 8 ++--- src/sensorctl/testcase/sensor_listener.cpp | 31 +++++++++++++++++ src/sensorctl/testcase/unit_ipc.cpp | 16 ++++----- src/server/application_sensor_handler.cpp | 8 ++--- src/server/fusion_sensor_handler.cpp | 2 +- src/server/fusion_sensor_handler.h | 2 +- src/server/sensor_handler.cpp | 21 +++--------- src/server/sensor_listener_proxy.cpp | 11 ++++--- src/server/sensor_listener_proxy.h | 8 ++--- src/server/sensor_manager.cpp | 16 ++++++--- src/server/sensor_observer.h | 3 +- src/server/server_channel_handler.cpp | 19 ++++++----- src/shared/channel.cpp | 39 +++++++++++----------- src/shared/channel.h | 4 +-- src/shared/ipc_server.cpp | 1 + src/shared/message.cpp | 53 ++++++++---------------------- src/shared/message.h | 9 ++++- 20 files changed, 144 insertions(+), 134 deletions(-) diff --git a/src/client/sensor_internal.cpp b/src/client/sensor_internal.cpp index 950c28d..0695e63 100644 --- a/src/client/sensor_internal.cpp +++ b/src/client/sensor_internal.cpp @@ -746,6 +746,7 @@ API int sensord_set_attribute_str(int handle, int attribute, const char *value, _E("Failed to set attribute[%d, %s]", attribute, value); return -EIO; } + _D("Set attribute ID[%d], attr[%d], len[%d]", listener->get_id(), attribute, len); return OP_SUCCESS; } @@ -763,7 +764,7 @@ API int sensord_get_attribute_str(int handle, int attribute, char **value, int* _E("Failed to get attribute[%d]", attribute); return -EIO; } - _D("Get attribute[%d, %d, %s]", listener->get_id(), attribute, *value); + _D("Get attribute ID[%d], attr[%d], len[%d]", listener->get_id(), attribute, *len); return OP_SUCCESS; } diff --git a/src/client/sensor_listener.cpp b/src/client/sensor_listener.cpp index c979244..612c1ad 100644 --- a/src/client/sensor_listener.cpp +++ b/src/client/sensor_listener.cpp @@ -224,7 +224,7 @@ bool sensor_listener::connect(void) memcpy(buf.sensor, m_sensor->get_uri().c_str(), m_sensor->get_uri().size()); msg.set_type(CMD_LISTENER_CONNECT); msg.enclose((const char *)&buf, sizeof(buf)); - m_evt_channel->send_sync(&msg); + m_evt_channel->send_sync(msg); m_evt_channel->read_sync(reply); reply.disclose((char *)&buf); @@ -364,7 +364,7 @@ int sensor_listener::start(void) msg.set_type(CMD_LISTENER_START); msg.enclose((char *)&buf, sizeof(buf)); - m_cmd_channel->send_sync(&msg); + m_cmd_channel->send_sync(msg); m_cmd_channel->read_sync(reply); if (reply.header()->err < 0) { @@ -392,7 +392,7 @@ int sensor_listener::stop(void) msg.set_type(CMD_LISTENER_STOP); msg.enclose((char *)&buf, sizeof(buf)); - m_cmd_channel->send_sync(&msg); + m_cmd_channel->send_sync(msg); m_cmd_channel->read_sync(reply); if (reply.header()->err < 0) { @@ -503,7 +503,7 @@ int sensor_listener::set_attribute(int attribute, int value) msg.set_type(CMD_LISTENER_SET_ATTR_INT); msg.enclose((char *)&buf, sizeof(buf)); - m_cmd_channel->send_sync(&msg); + m_cmd_channel->send_sync(msg); m_cmd_channel->read_sync(reply); if (reply.header()->err < 0) @@ -527,7 +527,7 @@ int sensor_listener::get_attribute(int attribute, int* value) msg.set_type(CMD_LISTENER_GET_ATTR_INT); msg.enclose((char *)&buf, sizeof(buf)); - m_cmd_channel->send_sync(&msg); + m_cmd_channel->send_sync(msg); m_cmd_channel->read_sync(reply); @@ -572,7 +572,7 @@ int sensor_listener::set_attribute(int attribute, const char *value, int len) msg.enclose((char *)buf, size); - m_cmd_channel->send_sync(&msg); + m_cmd_channel->send_sync(msg); m_cmd_channel->read_sync(reply); /* Message memory is released automatically after sending message, @@ -601,7 +601,7 @@ int sensor_listener::get_attribute(int attribute, char **value, int* len) msg.set_type(CMD_LISTENER_GET_ATTR_STR); msg.enclose((char *)&buf, sizeof(buf)); - m_cmd_channel->send_sync(&msg); + m_cmd_channel->send_sync(msg); m_cmd_channel->read_sync(reply); if (reply.header()->err < 0) { @@ -640,7 +640,7 @@ int sensor_listener::get_sensor_data(sensor_data_t *data) msg.set_type(CMD_LISTENER_GET_DATA); msg.enclose((char *)&buf, sizeof(buf)); - m_cmd_channel->send_sync(&msg); + m_cmd_channel->send_sync(msg); m_cmd_channel->read_sync(reply); reply.disclose((char *)&buf); @@ -672,7 +672,7 @@ int sensor_listener::get_sensor_data_list(sensor_data_t **data, int *count) msg.set_type(CMD_LISTENER_GET_DATA_LIST); msg.enclose((char *)&buf, sizeof(buf)); - m_cmd_channel->send_sync(&msg); + m_cmd_channel->send_sync(msg); m_cmd_channel->read_sync(reply); if (reply.header()->err < 0) { diff --git a/src/client/sensor_manager.cpp b/src/client/sensor_manager.cpp index f16601f..64b623c 100644 --- a/src/client/sensor_manager.cpp +++ b/src/client/sensor_manager.cpp @@ -207,7 +207,7 @@ bool sensor_manager::connect_channel(void) retvm_if(!m_mon_channel, false, "Failed to connect to server"); msg.set_type(CMD_MANAGER_CONNECT); - m_mon_channel->send_sync(&msg); + m_mon_channel->send_sync(msg); m_connected.store(true); @@ -290,7 +290,7 @@ bool sensor_manager::get_sensors_internal(void) msg.set_type(CMD_MANAGER_SENSOR_LIST); - ret = m_cmd_channel->send_sync(&msg); + ret = m_cmd_channel->send_sync(msg); retvm_if(!ret, false, "Failed to send message"); ret = m_cmd_channel->read_sync(reply); @@ -316,7 +316,7 @@ bool sensor_manager::has_privilege(std::string &uri) memcpy(buf.sensor, uri.c_str(), uri.size()); msg.enclose((const char *)&buf, sizeof(buf)); - ret = m_cmd_channel->send_sync(&msg); + ret = m_cmd_channel->send_sync(msg); retvm_if(!ret, false, "Failed to send message"); ret = m_cmd_channel->read_sync(reply); diff --git a/src/client/sensor_provider.cpp b/src/client/sensor_provider.cpp index a90f7c2..414e83f 100644 --- a/src/client/sensor_provider.cpp +++ b/src/client/sensor_provider.cpp @@ -100,7 +100,7 @@ int sensor_provider::serialize(sensor_info *info, char **bytes) info->serialize(*raw); - *bytes = new(std::nothrow) char[raw->size()]; + *bytes = (char *) malloc(raw->size()); retvm_if(!*bytes, -ENOMEM, "Failed to allocate memory"); std::copy(raw->begin(), raw->end(), *bytes); @@ -121,7 +121,7 @@ int sensor_provider::send_sensor_info(sensor_info *info) ipc::message msg((const char *)bytes, size); msg.set_type(CMD_PROVIDER_CONNECT); - m_channel->send_sync(&msg); + m_channel->send_sync(msg); return OP_SUCCESS; } @@ -174,7 +174,7 @@ int sensor_provider::publish(const sensor_data_t &data) msg.set_type(CMD_PROVIDER_PUBLISH); msg.enclose((const void *)(&data), sizeof(data)); - m_channel->send_sync(&msg); + m_channel->send_sync(msg); return OP_SUCCESS; } @@ -185,7 +185,7 @@ int sensor_provider::publish(const sensor_data_t data[], const int count) msg.set_type(CMD_PROVIDER_PUBLISH); msg.enclose((const void *)data, sizeof(sensor_data_t) * count); - m_channel->send_sync(&msg); + m_channel->send_sync(msg); return OP_SUCCESS; } diff --git a/src/sensorctl/testcase/sensor_listener.cpp b/src/sensorctl/testcase/sensor_listener.cpp index 6fc6bf4..70050b0 100644 --- a/src/sensorctl/testcase/sensor_listener.cpp +++ b/src/sensorctl/testcase/sensor_listener.cpp @@ -355,6 +355,37 @@ TESTCASE(sensor_listener, set_get_attribute_string_1) return true; } +#define BUF_SIZE 4000 +TESTCASE(sensor_listener, set_get_attribute_string_2) +{ + int err = 0; + bool ret = true; + int handle = 0; + char *value = NULL; + int len = 0; + sensor_t sensor = NULL; + int attr = 1; + char attr_value[BUF_SIZE] = {1, }; + attr_value[BUF_SIZE - 1] = 1; + + err = sensord_get_default_sensor(ACCELEROMETER_SENSOR, &sensor); + ASSERT_EQ(err, 0); + + handle = sensord_connect(sensor); + err = sensord_set_attribute_str(handle, attr, attr_value, BUF_SIZE); + ASSERT_EQ(err, 0); + + err = sensord_get_attribute_str(handle, attr, &value, &len); + ASSERT_EQ(err, 0); + ASSERT_EQ(len, BUF_SIZE); + + ret = sensord_disconnect(handle); + ASSERT_TRUE(ret); + + free(value); + return true; +} + TESTCASE(sensor_listener, set_get_get_attribute_string_1) { int err; diff --git a/src/sensorctl/testcase/unit_ipc.cpp b/src/sensorctl/testcase/unit_ipc.cpp index c11d439..10e11e4 100644 --- a/src/sensorctl/testcase/unit_ipc.cpp +++ b/src/sensorctl/testcase/unit_ipc.cpp @@ -61,7 +61,7 @@ public: { char buf[MAX_BUF_SIZE]; - message *reply = new(std::nothrow) message(); + auto reply = message::create(); RETM_IF(!reply, "Failed to allocate memory"); msg.disclose(buf); @@ -117,7 +117,7 @@ static bool run_ipc_client_sleep_1s(const char *str, int size, int count) msg.enclose(buf, MAX_BUF_SIZE); SLEEP_1S; - ch->send_sync(&msg); + ch->send_sync(msg); /* Test */ SLEEP_1S; @@ -162,7 +162,7 @@ static bool run_ipc_client_small_buffer(const char *str, int size, int count) _I("After: Buffer size : %d\n", buf_size); for (int i = 0; i < 1024; ++i) { - ch->send_sync(&msg); + ch->send_sync(msg); ch->read_sync(reply); } @@ -197,7 +197,7 @@ static bool run_ipc_client_1M(const char *str, int size, int count) SLEEP_1S; for (int i = 0; i < 256; ++i) { - ch->send_sync(&msg); + ch->send_sync(msg); ch->read_sync(reply); } @@ -224,7 +224,7 @@ public: char buf[MAX_BUF_SIZE]; msg.disclose(buf); - message *reply = new(std::nothrow) message(); + auto reply = message::create(); if (!reply) return; reply->enclose("TEXTTEXTTEXTTEXT", 16); @@ -278,7 +278,7 @@ static bool run_ipc_client_2_channel_message(const char *str, int size, int coun ASSERT_NE(ch[0], 0); msg.enclose("TESTTESTTEST", 12); - ch[0]->send_sync(&msg); + ch[0]->send_sync(msg); SLEEP_1S; ch[0]->read_sync(reply); reply.disclose(buf); @@ -289,7 +289,7 @@ static bool run_ipc_client_2_channel_message(const char *str, int size, int coun ASSERT_NE(ch[1], 0); msg.enclose("TESTTESTTEST", 12); - ch[1]->send_sync(&msg); + ch[1]->send_sync(msg); SLEEP_1S; ch[1]->read_sync(reply); reply.disclose(buf); @@ -338,7 +338,7 @@ static bool run_ipc_client(const char *str, int size, int count) char buf[MAX_BUF_SIZE]; msg.enclose("TESTTESTTEST", 12); - ch->send_sync(&msg); + ch->send_sync(msg); SLEEP_1S; diff --git a/src/server/application_sensor_handler.cpp b/src/server/application_sensor_handler.cpp index df5db08..fe1aac3 100644 --- a/src/server/application_sensor_handler.cpp +++ b/src/server/application_sensor_handler.cpp @@ -58,7 +58,7 @@ int application_sensor_handler::start(sensor_observer *ob) ipc::message msg; msg.set_type(CMD_PROVIDER_START); - m_ch->send_sync(&msg); + m_ch->send_sync(msg); m_started.store(true); _I("Started[%s]", m_info.get_uri().c_str()); @@ -75,7 +75,7 @@ int application_sensor_handler::stop(sensor_observer *ob) ipc::message msg; msg.set_type(CMD_PROVIDER_STOP); - m_ch->send_sync(&msg); + m_ch->send_sync(msg); m_started.store(false); _I("Stopped[%s]", m_info.get_uri().c_str()); @@ -119,7 +119,7 @@ int application_sensor_handler::set_interval(sensor_observer *ob, int32_t interv msg.set_type(CMD_PROVIDER_ATTR_INT); msg.enclose((const char *)&buf, sizeof(cmd_provider_attr_int_t)); - m_ch->send_sync(&msg); + m_ch->send_sync(msg); m_prev_interval = cur_interval; @@ -157,7 +157,7 @@ int application_sensor_handler::set_attribute(sensor_observer *ob, int32_t attr, msg.enclose((char *)buf, size); - m_ch->send_sync(&msg); + m_ch->send_sync(msg); _I("Set attribute[%d] to sensor[%s]", attr, m_info.get_uri().c_str()); update_attribute(attr, value, len); diff --git a/src/server/fusion_sensor_handler.cpp b/src/server/fusion_sensor_handler.cpp index 5d5a8d1..1ede44c 100644 --- a/src/server/fusion_sensor_handler.cpp +++ b/src/server/fusion_sensor_handler.cpp @@ -43,7 +43,7 @@ void fusion_sensor_handler::add_required_sensor(uint32_t id, sensor_handler *sen m_required_sensors.emplace(info.get_uri(), required_sensor(id, sensor)); } -int fusion_sensor_handler::update(const char *uri, ipc::message *msg) +int fusion_sensor_handler::update(const char *uri, std::shared_ptr msg) { retv_if(!m_sensor, -EINVAL); diff --git a/src/server/fusion_sensor_handler.h b/src/server/fusion_sensor_handler.h index 9feb67f..e5d6db3 100644 --- a/src/server/fusion_sensor_handler.h +++ b/src/server/fusion_sensor_handler.h @@ -50,7 +50,7 @@ public: void add_required_sensor(uint32_t id, sensor_handler *sensor); /* subscriber */ - int update(const char *uri, ipc::message *msg); + int update(const char *uri, std::shared_ptr msg); /* sensor interface */ const sensor_info &get_sensor_info(void); diff --git a/src/server/sensor_handler.cpp b/src/server/sensor_handler.cpp index 843c794..9ae43a3 100644 --- a/src/server/sensor_handler.cpp +++ b/src/server/sensor_handler.cpp @@ -81,9 +81,8 @@ int sensor_handler::notify(const char *uri, sensor_data_t *data, int len) if (observer_count() == 0) return OP_ERROR; - ipc::message *msg; + auto msg = ipc::message::create((char *)data, len); - msg = new(std::nothrow) ipc::message((char *)data, len); retvm_if(!msg, OP_ERROR, "Failed to allocate memory"); for (auto it = m_observers.begin(); it != m_observers.end(); ++it) @@ -91,10 +90,6 @@ int sensor_handler::notify(const char *uri, sensor_data_t *data, int len) set_cache(data, len); - if (msg->ref_count() == 0) { - msg->unref(); - } - return OP_SUCCESS; } @@ -141,8 +136,8 @@ bool sensor_handler::notify_attribute_changed(uint32_t id, int attribute, int va buf.attribute = attribute; buf.value = value; - ipc::message *msg; - msg = new(std::nothrow) ipc::message(); + auto msg = ipc::message::create(); + retvm_if(!msg, OP_ERROR, "Failed to allocate memory"); msg->set_type(CMD_LISTENER_SET_ATTR_INT); @@ -156,9 +151,6 @@ bool sensor_handler::notify_attribute_changed(uint32_t id, int attribute, int va } } - if (msg->ref_count() == 0) - msg->unref(); - return OP_SUCCESS; } @@ -173,8 +165,7 @@ bool sensor_handler::notify_attribute_changed(uint32_t id, int attribute, const buf = (cmd_listener_attr_str_t *) new(std::nothrow) char[size]; retvm_if(!buf, -ENOMEM, "Failed to allocate memory"); - ipc::message *msg; - msg = new(std::nothrow) ipc::message(); + auto msg = ipc::message::create(); retvm_if(!msg, OP_ERROR, "Failed to allocate memory"); buf->listener_id = id; @@ -194,10 +185,6 @@ bool sensor_handler::notify_attribute_changed(uint32_t id, int attribute, const } } - if (msg->ref_count() == 0) { - msg->unref(); - } - delete[] buf; return OP_SUCCESS; diff --git a/src/server/sensor_listener_proxy.cpp b/src/server/sensor_listener_proxy.cpp index f184e45..0862774 100644 --- a/src/server/sensor_listener_proxy.cpp +++ b/src/server/sensor_listener_proxy.cpp @@ -56,7 +56,7 @@ uint32_t sensor_listener_proxy::get_id(void) return m_id; } -int sensor_listener_proxy::update(const char *uri, ipc::message *msg) +int sensor_listener_proxy::update(const char *uri, std::shared_ptr msg) { retv_if(!m_ch || !m_ch->is_connected(), OP_CONTINUE); @@ -66,7 +66,7 @@ int sensor_listener_proxy::update(const char *uri, ipc::message *msg) return OP_CONTINUE; } -int sensor_listener_proxy::on_attribute_changed(ipc::message *msg) +int sensor_listener_proxy::on_attribute_changed(std::shared_ptr msg) { retv_if(!m_ch || !m_ch->is_connected(), OP_CONTINUE); _I("Proxy[%zu] call on_attribute_changed\n", get_id()); @@ -74,7 +74,7 @@ int sensor_listener_proxy::on_attribute_changed(ipc::message *msg) return OP_CONTINUE; } -void sensor_listener_proxy::update_event(ipc::message *msg) +void sensor_listener_proxy::update_event(std::shared_ptr msg) { /* TODO: check axis orientation */ msg->header()->type = CMD_LISTENER_EVENT; @@ -83,7 +83,7 @@ void sensor_listener_proxy::update_event(ipc::message *msg) m_ch->send(msg); } -void sensor_listener_proxy::update_accuracy(ipc::message *msg) +void sensor_listener_proxy::update_accuracy(std::shared_ptr msg) { sensor_data_t *data = reinterpret_cast(msg->body()); @@ -95,7 +95,8 @@ void sensor_listener_proxy::update_accuracy(ipc::message *msg) sensor_data_t acc_data; acc_data.accuracy = m_last_accuracy; - ipc::message *acc_msg = new(std::nothrow) ipc::message(); + auto acc_msg = ipc::message::create(); + retm_if(!acc_msg, "Failed to allocate memory"); acc_msg->header()->type = CMD_LISTENER_ACC_EVENT; diff --git a/src/server/sensor_listener_proxy.h b/src/server/sensor_listener_proxy.h index f87447b..084ffe2 100644 --- a/src/server/sensor_listener_proxy.h +++ b/src/server/sensor_listener_proxy.h @@ -38,8 +38,8 @@ public: uint32_t get_id(void); /* sensor observer */ - int update(const char *uri, ipc::message *msg); - int on_attribute_changed(ipc::message *msg); + int update(const char *uri, std::shared_ptr msg); + int on_attribute_changed(std::shared_ptr msg); int start(bool policy = false); int stop(bool policy = false); @@ -62,8 +62,8 @@ public: bool notify_attribute_changed(int attribute, const char *value, int len); private: - void update_event(ipc::message *msg); - void update_accuracy(ipc::message *msg); + void update_event(std::shared_ptr msg); + void update_accuracy(std::shared_ptr msg); uint32_t m_id; std::string m_uri; diff --git a/src/server/sensor_manager.cpp b/src/server/sensor_manager.cpp index f6cc1f5..6173800 100644 --- a/src/server/sensor_manager.cpp +++ b/src/server/sensor_manager.cpp @@ -115,7 +115,7 @@ int sensor_manager::serialize(sensor_info *info, char **bytes) info->serialize(*raw); - *bytes = new(std::nothrow) char[raw->size()]; + *bytes = (char *) malloc(raw->size()); retvm_if(!*bytes, -ENOMEM, "Failed to allocate memory"); std::copy(raw->begin(), raw->end(), *bytes); @@ -129,7 +129,7 @@ int sensor_manager::serialize(sensor_info *info, char **bytes) void sensor_manager::send(ipc::message &msg) { for (auto it = m_channels.begin(); it != m_channels.end(); ++it) - (*it)->send_sync(&msg); + (*it)->send_sync(msg); } void sensor_manager::send_added_msg(sensor_info *info) @@ -410,8 +410,16 @@ void sensor_manager::register_handler(physical_sensor_handler *sensor) handler->add_sensor(sensor); m_event_handlers[fd] = handler; - m_loop->add_event(fd, - ipc::EVENT_IN | ipc::EVENT_HUP | ipc::EVENT_NVAL, handler); + if (m_loop->add_event(fd, ipc::EVENT_IN | ipc::EVENT_HUP | ipc::EVENT_NVAL, handler) == 0) { + _D("Failed to add sensor event handler"); + handler->remove_sensor(sensor); + + auto iter = m_event_handlers.find(fd); + if (iter != m_event_handlers.end()) { + m_event_handlers.erase(iter); + } + delete handler; + } } void sensor_manager::show(void) diff --git a/src/server/sensor_observer.h b/src/server/sensor_observer.h index ee04c20..9c86061 100644 --- a/src/server/sensor_observer.h +++ b/src/server/sensor_observer.h @@ -28,8 +28,7 @@ class sensor_observer { public: virtual ~sensor_observer() {} - /* for performance, use message */ - virtual int update(const char *uri, ipc::message *msg) = 0; + virtual int update(const char *uri, std::shared_ptr msg) = 0; }; } diff --git a/src/server/server_channel_handler.cpp b/src/server/server_channel_handler.cpp index 69f15f4..0058c17 100644 --- a/src/server/server_channel_handler.cpp +++ b/src/server/server_channel_handler.cpp @@ -112,7 +112,7 @@ void server_channel_handler::read(channel *ch, message &msg) if (err != 0) { message reply(err); - ch->send_sync(&reply); + ch->send_sync(reply); } } @@ -133,7 +133,7 @@ int server_channel_handler::manager_get_sensor_list(channel *ch, message &msg) reply.enclose((const char *)bytes, size); reply.header()->err = OP_SUCCESS; - ch->send_sync(&reply); + ch->send_sync(reply); delete [] bytes; @@ -161,7 +161,7 @@ int server_channel_handler::listener_connect(channel *ch, message &msg) reply.enclose((const char *)&buf, sizeof(buf)); reply.header()->err = OP_SUCCESS; - if (!ch->send_sync(&reply)) + if (!ch->send_sync(reply)) return OP_ERROR; _I("Connected sensor_listener[fd(%d) -> id(%u)]", ch->get_fd(), listener_id); @@ -316,9 +316,10 @@ int server_channel_handler::listener_get_attr_int(ipc::channel *ch, ipc::message } if (ret == OP_SUCCESS) { - message reply((char *)&buf, sizeof(buf)); + message reply; + reply.enclose((char *)&buf, sizeof(buf)); reply.set_type(CMD_LISTENER_GET_ATTR_INT); - ret = ch->send_sync(&reply); + ret = ch->send_sync(reply); } else { ret = send_reply(ch, OP_ERROR); } @@ -369,7 +370,7 @@ int server_channel_handler::listener_get_attr_str(ipc::channel *ch, ipc::message reply.enclose((char *)reply_buf, size); reply.set_type(CMD_LISTENER_GET_ATTR_STR); - ret = ch->send_sync(&reply); + ret = ch->send_sync(reply); delete [] reply_buf; } else { ret = send_reply(ch, OP_ERROR); @@ -405,7 +406,7 @@ int server_channel_handler::listener_get_data(channel *ch, message &msg) reply.header()->err = OP_SUCCESS; reply.header()->type = CMD_LISTENER_GET_DATA; - ch->send_sync(&reply); + ch->send_sync(reply); free(data); @@ -447,7 +448,7 @@ int server_channel_handler::listener_get_data_list(ipc::channel *ch, ipc::messag reply.header()->err = OP_SUCCESS; reply.header()->type = CMD_LISTENER_GET_DATA_LIST; - ch->send_sync(&reply); + ch->send_sync(reply); free(data); free(reply_buf); @@ -514,7 +515,7 @@ int server_channel_handler::has_privileges(channel *ch, message &msg) int server_channel_handler::send_reply(channel *ch, int error) { message reply(error); - retvm_if(!ch->send_sync(&reply), OP_ERROR, "Failed to send reply"); + retvm_if(!ch->send_sync(reply), OP_ERROR, "Failed to send reply"); return OP_SUCCESS; } diff --git a/src/shared/channel.cpp b/src/shared/channel.cpp index f27e1d4..6df1d2f 100644 --- a/src/shared/channel.cpp +++ b/src/shared/channel.cpp @@ -33,7 +33,7 @@ using namespace ipc; class send_event_handler : public event_handler { public: - send_event_handler(channel *ch, message *msg) + send_event_handler(channel *ch, std::shared_ptr msg) : m_ch(ch) , m_msg(msg) { } @@ -46,18 +46,15 @@ public: if (condition & (EVENT_IN | EVENT_HUP)) return false; - if (!m_ch->send_sync(m_msg)) + if (!m_ch->send_sync(*m_msg)) return false; - if (m_msg) - m_msg->unref(); - return false; } private: channel *m_ch; - message *m_msg; + std::shared_ptr m_msg; }; class read_event_handler : public event_handler @@ -173,7 +170,7 @@ void channel::disconnect(void) _D("Disconnected"); } -bool channel::send(message *msg) +bool channel::send(std::shared_ptr msg) { int retry_cnt = 0; int cur_buffer_size = 0; @@ -192,32 +189,32 @@ bool channel::send(message *msg) send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg); retvm_if(!handler, false, "Failed to allocate memory"); - msg->ref(); - - m_loop->add_event(m_socket->get_fd(), - (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler); + if (m_loop->add_event(m_socket->get_fd(), (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler) == 0) { + _D("Failed to add send event handler"); + delete handler; + return false; + } return true; } -bool channel::send_sync(message *msg) +bool channel::send_sync(message &msg) { - retvm_if(!msg, false, "Invalid message"); - retvm_if(msg->size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg->size()); + retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size()); ssize_t size = 0; - char *buf = msg->body(); + char *buf = msg.body(); /* header */ - size = m_socket->send(reinterpret_cast(msg->header()), + size = m_socket->send(reinterpret_cast(msg.header()), sizeof(message_header), true); retvm_if(size <= 0, false, "Failed to send header"); /* if body size is zero, skip to send body message */ - retv_if(msg->size() == 0, true); + retv_if(msg.size() == 0, true); /* body */ - size = m_socket->send(buf, msg->size(), true); + size = m_socket->send(buf, msg.size(), true); retvm_if(size <= 0, false, "Failed to send body"); return true; @@ -230,7 +227,11 @@ bool channel::read(void) read_event_handler *handler = new(std::nothrow) read_event_handler(this); retvm_if(!handler, false, "Failed to allocate memory"); - m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler); + if (m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler) == 0) { + _D("Failed to add read event handler"); + delete handler; + return false; + } return true; } diff --git a/src/shared/channel.h b/src/shared/channel.h index efdc481..615fd26 100644 --- a/src/shared/channel.h +++ b/src/shared/channel.h @@ -46,8 +46,8 @@ public: bool is_connected(void); - bool send(message *msg); - bool send_sync(message *msg); + bool send(std::shared_ptr msg); + bool send_sync(message &msg); bool read(void); bool read_sync(message &msg, bool select = true); diff --git a/src/shared/ipc_server.cpp b/src/shared/ipc_server.cpp index 66431da..f847c4d 100644 --- a/src/shared/ipc_server.cpp +++ b/src/shared/ipc_server.cpp @@ -98,6 +98,7 @@ void ipc_server::register_acceptor(void) (event_condition)(EVENT_IN | EVENT_HUP | EVENT_NVAL), m_accept_handler); if (id == 0) { + _D("Failed to add accept event handler"); delete m_accept_handler; m_accept_handler = NULL; } diff --git a/src/shared/message.cpp b/src/shared/message.cpp index d7455eb..f28780d 100644 --- a/src/shared/message.cpp +++ b/src/shared/message.cpp @@ -29,10 +29,9 @@ using namespace ipc; static std::atomic sequence(0); message::message(size_t capacity) -: m_size(0) -, m_capacity(capacity) -, m_msg((char *)malloc(sizeof(char) * capacity)) -, ref_cnt(0) + : m_size(0) + , m_capacity(capacity) + , m_msg((char *)malloc(sizeof(char) * capacity)) { m_header.id = sequence++; m_header.type = UNDEFINED_TYPE; @@ -41,10 +40,9 @@ message::message(size_t capacity) } message::message(const void *msg, size_t sz) -: m_size(sz) -, m_capacity(sz) -, m_msg((char *)msg) -, ref_cnt(0) + : m_size(sz) + , m_capacity(sz) + , m_msg((char *)msg) { m_header.id = sequence++; m_header.type = UNDEFINED_TYPE; @@ -53,20 +51,18 @@ message::message(const void *msg, size_t sz) } message::message(const message &msg) -: m_size(msg.m_size) -, m_capacity(msg.m_capacity) -, m_msg((char *)malloc(sizeof(char) * msg.m_capacity)) -, ref_cnt(0) + : m_size(msg.m_size) + , m_capacity(msg.m_capacity) + , m_msg((char *)malloc(sizeof(char) * msg.m_capacity)) { ::memcpy(&m_header, &msg.m_header, sizeof(message_header)); ::memcpy(m_msg, msg.m_msg, msg.m_size); } message::message(int error) -: m_size(0) -, m_capacity(0) -, m_msg(NULL) -, ref_cnt(0) + : m_size(0) + , m_capacity(0) + , m_msg(NULL) { m_header.id = sequence++; m_header.type = UNDEFINED_TYPE; @@ -76,7 +72,7 @@ message::message(int error) message::~message() { - if (m_msg && ref_cnt == 0) { + if (m_msg) { free(m_msg); m_msg = NULL; } @@ -125,29 +121,6 @@ size_t message::size(void) return m_size; } -/* TODO: remove ref/unref and use reference counting automatically */ -void message::ref(void) -{ - ref_cnt++; -} - -void message::unref(void) -{ - ref_cnt--; - - if (ref_cnt > 0 || !m_msg) - return; - - free(m_msg); - m_msg = NULL; - delete this; -} - -int message::ref_count(void) -{ - return ref_cnt; -} - message_header *message::header(void) { return &m_header; diff --git a/src/shared/message.h b/src/shared/message.h index 118cf13..0d932b6 100644 --- a/src/shared/message.h +++ b/src/shared/message.h @@ -22,6 +22,7 @@ #include /* size_t */ #include +#include #define MAX_MSG_CAPACITY (16*1024) #define MAX_HEADER_RESERVED 3 @@ -38,6 +39,13 @@ typedef struct message_header { class message { public: + template + static std::shared_ptr create(Args&&... args) + noexcept(noexcept(message(std::forward(args)...))) + { + return std::shared_ptr(new (std::nothrow) message(std::forward(args)...)); + } + message(size_t capacity = MAX_MSG_CAPACITY); message(const void *msg, size_t size); message(const message &msg); @@ -66,7 +74,6 @@ private: size_t m_capacity; char *m_msg; - std::atomic ref_cnt; }; } -- 2.7.4