_E("Failed to set attribute[%d, %s]", attribute, value);
return -EIO;
}
+ _D("Set attribute ID[%d], attr[%d], len[%d]", listener->get_id(), attribute, len);
return OP_SUCCESS;
}
_E("Failed to get attribute[%d]", attribute);
return -EIO;
}
- _D("Get attribute[%d, %d, %s]", listener->get_id(), attribute, *value);
+ _D("Get attribute ID[%d], attr[%d], len[%d]", listener->get_id(), attribute, *len);
return OP_SUCCESS;
}
memcpy(buf.sensor, m_sensor->get_uri().c_str(), m_sensor->get_uri().size());
msg.set_type(CMD_LISTENER_CONNECT);
msg.enclose((const char *)&buf, sizeof(buf));
- m_evt_channel->send_sync(&msg);
+ m_evt_channel->send_sync(msg);
m_evt_channel->read_sync(reply);
reply.disclose((char *)&buf);
msg.set_type(CMD_LISTENER_START);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(&msg);
+ m_cmd_channel->send_sync(msg);
m_cmd_channel->read_sync(reply);
if (reply.header()->err < 0) {
msg.set_type(CMD_LISTENER_STOP);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(&msg);
+ m_cmd_channel->send_sync(msg);
m_cmd_channel->read_sync(reply);
if (reply.header()->err < 0) {
msg.set_type(CMD_LISTENER_SET_ATTR_INT);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(&msg);
+ m_cmd_channel->send_sync(msg);
m_cmd_channel->read_sync(reply);
if (reply.header()->err < 0)
msg.set_type(CMD_LISTENER_GET_ATTR_INT);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(&msg);
+ m_cmd_channel->send_sync(msg);
m_cmd_channel->read_sync(reply);
msg.enclose((char *)buf, size);
- m_cmd_channel->send_sync(&msg);
+ m_cmd_channel->send_sync(msg);
m_cmd_channel->read_sync(reply);
/* Message memory is released automatically after sending message,
msg.set_type(CMD_LISTENER_GET_ATTR_STR);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(&msg);
+ m_cmd_channel->send_sync(msg);
m_cmd_channel->read_sync(reply);
if (reply.header()->err < 0) {
msg.set_type(CMD_LISTENER_GET_DATA);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(&msg);
+ m_cmd_channel->send_sync(msg);
m_cmd_channel->read_sync(reply);
reply.disclose((char *)&buf);
msg.set_type(CMD_LISTENER_GET_DATA_LIST);
msg.enclose((char *)&buf, sizeof(buf));
- m_cmd_channel->send_sync(&msg);
+ m_cmd_channel->send_sync(msg);
m_cmd_channel->read_sync(reply);
if (reply.header()->err < 0) {
retvm_if(!m_mon_channel, false, "Failed to connect to server");
msg.set_type(CMD_MANAGER_CONNECT);
- m_mon_channel->send_sync(&msg);
+ m_mon_channel->send_sync(msg);
m_connected.store(true);
msg.set_type(CMD_MANAGER_SENSOR_LIST);
- ret = m_cmd_channel->send_sync(&msg);
+ ret = m_cmd_channel->send_sync(msg);
retvm_if(!ret, false, "Failed to send message");
ret = m_cmd_channel->read_sync(reply);
memcpy(buf.sensor, uri.c_str(), uri.size());
msg.enclose((const char *)&buf, sizeof(buf));
- ret = m_cmd_channel->send_sync(&msg);
+ ret = m_cmd_channel->send_sync(msg);
retvm_if(!ret, false, "Failed to send message");
ret = m_cmd_channel->read_sync(reply);
info->serialize(*raw);
- *bytes = new(std::nothrow) char[raw->size()];
+ *bytes = (char *) malloc(raw->size());
retvm_if(!*bytes, -ENOMEM, "Failed to allocate memory");
std::copy(raw->begin(), raw->end(), *bytes);
ipc::message msg((const char *)bytes, size);
msg.set_type(CMD_PROVIDER_CONNECT);
- m_channel->send_sync(&msg);
+ m_channel->send_sync(msg);
return OP_SUCCESS;
}
msg.set_type(CMD_PROVIDER_PUBLISH);
msg.enclose((const void *)(&data), sizeof(data));
- m_channel->send_sync(&msg);
+ m_channel->send_sync(msg);
return OP_SUCCESS;
}
msg.set_type(CMD_PROVIDER_PUBLISH);
msg.enclose((const void *)data, sizeof(sensor_data_t) * count);
- m_channel->send_sync(&msg);
+ m_channel->send_sync(msg);
return OP_SUCCESS;
}
return true;
}
+#define BUF_SIZE 4000
+TESTCASE(sensor_listener, set_get_attribute_string_2)
+{
+ int err = 0;
+ bool ret = true;
+ int handle = 0;
+ char *value = NULL;
+ int len = 0;
+ sensor_t sensor = NULL;
+ int attr = 1;
+ char attr_value[BUF_SIZE] = {1, };
+ attr_value[BUF_SIZE - 1] = 1;
+
+ err = sensord_get_default_sensor(ACCELEROMETER_SENSOR, &sensor);
+ ASSERT_EQ(err, 0);
+
+ handle = sensord_connect(sensor);
+ err = sensord_set_attribute_str(handle, attr, attr_value, BUF_SIZE);
+ ASSERT_EQ(err, 0);
+
+ err = sensord_get_attribute_str(handle, attr, &value, &len);
+ ASSERT_EQ(err, 0);
+ ASSERT_EQ(len, BUF_SIZE);
+
+ ret = sensord_disconnect(handle);
+ ASSERT_TRUE(ret);
+
+ free(value);
+ return true;
+}
+
TESTCASE(sensor_listener, set_get_get_attribute_string_1)
{
int err;
{
char buf[MAX_BUF_SIZE];
- message *reply = new(std::nothrow) message();
+ auto reply = message::create();
RETM_IF(!reply, "Failed to allocate memory");
msg.disclose(buf);
msg.enclose(buf, MAX_BUF_SIZE);
SLEEP_1S;
- ch->send_sync(&msg);
+ ch->send_sync(msg);
/* Test */
SLEEP_1S;
_I("After: Buffer size : %d\n", buf_size);
for (int i = 0; i < 1024; ++i) {
- ch->send_sync(&msg);
+ ch->send_sync(msg);
ch->read_sync(reply);
}
SLEEP_1S;
for (int i = 0; i < 256; ++i) {
- ch->send_sync(&msg);
+ ch->send_sync(msg);
ch->read_sync(reply);
}
char buf[MAX_BUF_SIZE];
msg.disclose(buf);
- message *reply = new(std::nothrow) message();
+ auto reply = message::create();
if (!reply) return;
reply->enclose("TEXTTEXTTEXTTEXT", 16);
ASSERT_NE(ch[0], 0);
msg.enclose("TESTTESTTEST", 12);
- ch[0]->send_sync(&msg);
+ ch[0]->send_sync(msg);
SLEEP_1S;
ch[0]->read_sync(reply);
reply.disclose(buf);
ASSERT_NE(ch[1], 0);
msg.enclose("TESTTESTTEST", 12);
- ch[1]->send_sync(&msg);
+ ch[1]->send_sync(msg);
SLEEP_1S;
ch[1]->read_sync(reply);
reply.disclose(buf);
char buf[MAX_BUF_SIZE];
msg.enclose("TESTTESTTEST", 12);
- ch->send_sync(&msg);
+ ch->send_sync(msg);
SLEEP_1S;
ipc::message msg;
msg.set_type(CMD_PROVIDER_START);
- m_ch->send_sync(&msg);
+ m_ch->send_sync(msg);
m_started.store(true);
_I("Started[%s]", m_info.get_uri().c_str());
ipc::message msg;
msg.set_type(CMD_PROVIDER_STOP);
- m_ch->send_sync(&msg);
+ m_ch->send_sync(msg);
m_started.store(false);
_I("Stopped[%s]", m_info.get_uri().c_str());
msg.set_type(CMD_PROVIDER_ATTR_INT);
msg.enclose((const char *)&buf, sizeof(cmd_provider_attr_int_t));
- m_ch->send_sync(&msg);
+ m_ch->send_sync(msg);
m_prev_interval = cur_interval;
msg.enclose((char *)buf, size);
- m_ch->send_sync(&msg);
+ m_ch->send_sync(msg);
_I("Set attribute[%d] to sensor[%s]", attr, m_info.get_uri().c_str());
update_attribute(attr, value, len);
m_required_sensors.emplace(info.get_uri(), required_sensor(id, sensor));
}
-int fusion_sensor_handler::update(const char *uri, ipc::message *msg)
+int fusion_sensor_handler::update(const char *uri, std::shared_ptr<ipc::message> msg)
{
retv_if(!m_sensor, -EINVAL);
void add_required_sensor(uint32_t id, sensor_handler *sensor);
/* subscriber */
- int update(const char *uri, ipc::message *msg);
+ int update(const char *uri, std::shared_ptr<ipc::message> msg);
/* sensor interface */
const sensor_info &get_sensor_info(void);
if (observer_count() == 0)
return OP_ERROR;
- ipc::message *msg;
+ auto msg = ipc::message::create((char *)data, len);
- msg = new(std::nothrow) ipc::message((char *)data, len);
retvm_if(!msg, OP_ERROR, "Failed to allocate memory");
for (auto it = m_observers.begin(); it != m_observers.end(); ++it)
set_cache(data, len);
- if (msg->ref_count() == 0) {
- msg->unref();
- }
-
return OP_SUCCESS;
}
buf.attribute = attribute;
buf.value = value;
- ipc::message *msg;
- msg = new(std::nothrow) ipc::message();
+ auto msg = ipc::message::create();
+
retvm_if(!msg, OP_ERROR, "Failed to allocate memory");
msg->set_type(CMD_LISTENER_SET_ATTR_INT);
}
}
- if (msg->ref_count() == 0)
- msg->unref();
-
return OP_SUCCESS;
}
buf = (cmd_listener_attr_str_t *) new(std::nothrow) char[size];
retvm_if(!buf, -ENOMEM, "Failed to allocate memory");
- ipc::message *msg;
- msg = new(std::nothrow) ipc::message();
+ auto msg = ipc::message::create();
retvm_if(!msg, OP_ERROR, "Failed to allocate memory");
buf->listener_id = id;
}
}
- if (msg->ref_count() == 0) {
- msg->unref();
- }
-
delete[] buf;
return OP_SUCCESS;
return m_id;
}
-int sensor_listener_proxy::update(const char *uri, ipc::message *msg)
+int sensor_listener_proxy::update(const char *uri, std::shared_ptr<ipc::message> msg)
{
retv_if(!m_ch || !m_ch->is_connected(), OP_CONTINUE);
return OP_CONTINUE;
}
-int sensor_listener_proxy::on_attribute_changed(ipc::message *msg)
+int sensor_listener_proxy::on_attribute_changed(std::shared_ptr<ipc::message> msg)
{
retv_if(!m_ch || !m_ch->is_connected(), OP_CONTINUE);
_I("Proxy[%zu] call on_attribute_changed\n", get_id());
return OP_CONTINUE;
}
-void sensor_listener_proxy::update_event(ipc::message *msg)
+void sensor_listener_proxy::update_event(std::shared_ptr<ipc::message> msg)
{
/* TODO: check axis orientation */
msg->header()->type = CMD_LISTENER_EVENT;
m_ch->send(msg);
}
-void sensor_listener_proxy::update_accuracy(ipc::message *msg)
+void sensor_listener_proxy::update_accuracy(std::shared_ptr<ipc::message> msg)
{
sensor_data_t *data = reinterpret_cast<sensor_data_t *>(msg->body());
sensor_data_t acc_data;
acc_data.accuracy = m_last_accuracy;
- ipc::message *acc_msg = new(std::nothrow) ipc::message();
+ auto acc_msg = ipc::message::create();
+
retm_if(!acc_msg, "Failed to allocate memory");
acc_msg->header()->type = CMD_LISTENER_ACC_EVENT;
uint32_t get_id(void);
/* sensor observer */
- int update(const char *uri, ipc::message *msg);
- int on_attribute_changed(ipc::message *msg);
+ int update(const char *uri, std::shared_ptr<ipc::message> msg);
+ int on_attribute_changed(std::shared_ptr<ipc::message> msg);
int start(bool policy = false);
int stop(bool policy = false);
bool notify_attribute_changed(int attribute, const char *value, int len);
private:
- void update_event(ipc::message *msg);
- void update_accuracy(ipc::message *msg);
+ void update_event(std::shared_ptr<ipc::message> msg);
+ void update_accuracy(std::shared_ptr<ipc::message> msg);
uint32_t m_id;
std::string m_uri;
info->serialize(*raw);
- *bytes = new(std::nothrow) char[raw->size()];
+ *bytes = (char *) malloc(raw->size());
retvm_if(!*bytes, -ENOMEM, "Failed to allocate memory");
std::copy(raw->begin(), raw->end(), *bytes);
void sensor_manager::send(ipc::message &msg)
{
for (auto it = m_channels.begin(); it != m_channels.end(); ++it)
- (*it)->send_sync(&msg);
+ (*it)->send_sync(msg);
}
void sensor_manager::send_added_msg(sensor_info *info)
handler->add_sensor(sensor);
m_event_handlers[fd] = handler;
- m_loop->add_event(fd,
- ipc::EVENT_IN | ipc::EVENT_HUP | ipc::EVENT_NVAL, handler);
+ if (m_loop->add_event(fd, ipc::EVENT_IN | ipc::EVENT_HUP | ipc::EVENT_NVAL, handler) == 0) {
+ _D("Failed to add sensor event handler");
+ handler->remove_sensor(sensor);
+
+ auto iter = m_event_handlers.find(fd);
+ if (iter != m_event_handlers.end()) {
+ m_event_handlers.erase(iter);
+ }
+ delete handler;
+ }
}
void sensor_manager::show(void)
public:
virtual ~sensor_observer() {}
- /* for performance, use message */
- virtual int update(const char *uri, ipc::message *msg) = 0;
+ virtual int update(const char *uri, std::shared_ptr<ipc::message> msg) = 0;
};
}
if (err != 0) {
message reply(err);
- ch->send_sync(&reply);
+ ch->send_sync(reply);
}
}
reply.enclose((const char *)bytes, size);
reply.header()->err = OP_SUCCESS;
- ch->send_sync(&reply);
+ ch->send_sync(reply);
delete [] bytes;
reply.enclose((const char *)&buf, sizeof(buf));
reply.header()->err = OP_SUCCESS;
- if (!ch->send_sync(&reply))
+ if (!ch->send_sync(reply))
return OP_ERROR;
_I("Connected sensor_listener[fd(%d) -> id(%u)]", ch->get_fd(), listener_id);
}
if (ret == OP_SUCCESS) {
- message reply((char *)&buf, sizeof(buf));
+ message reply;
+ reply.enclose((char *)&buf, sizeof(buf));
reply.set_type(CMD_LISTENER_GET_ATTR_INT);
- ret = ch->send_sync(&reply);
+ ret = ch->send_sync(reply);
} else {
ret = send_reply(ch, OP_ERROR);
}
reply.enclose((char *)reply_buf, size);
reply.set_type(CMD_LISTENER_GET_ATTR_STR);
- ret = ch->send_sync(&reply);
+ ret = ch->send_sync(reply);
delete [] reply_buf;
} else {
ret = send_reply(ch, OP_ERROR);
reply.header()->err = OP_SUCCESS;
reply.header()->type = CMD_LISTENER_GET_DATA;
- ch->send_sync(&reply);
+ ch->send_sync(reply);
free(data);
reply.header()->err = OP_SUCCESS;
reply.header()->type = CMD_LISTENER_GET_DATA_LIST;
- ch->send_sync(&reply);
+ ch->send_sync(reply);
free(data);
free(reply_buf);
int server_channel_handler::send_reply(channel *ch, int error)
{
message reply(error);
- retvm_if(!ch->send_sync(&reply), OP_ERROR, "Failed to send reply");
+ retvm_if(!ch->send_sync(reply), OP_ERROR, "Failed to send reply");
return OP_SUCCESS;
}
class send_event_handler : public event_handler
{
public:
- send_event_handler(channel *ch, message *msg)
+ send_event_handler(channel *ch, std::shared_ptr<message> msg)
: m_ch(ch)
, m_msg(msg)
{ }
if (condition & (EVENT_IN | EVENT_HUP))
return false;
- if (!m_ch->send_sync(m_msg))
+ if (!m_ch->send_sync(*m_msg))
return false;
- if (m_msg)
- m_msg->unref();
-
return false;
}
private:
channel *m_ch;
- message *m_msg;
+ std::shared_ptr<message> m_msg;
};
class read_event_handler : public event_handler
_D("Disconnected");
}
-bool channel::send(message *msg)
+bool channel::send(std::shared_ptr<message> msg)
{
int retry_cnt = 0;
int cur_buffer_size = 0;
send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg);
retvm_if(!handler, false, "Failed to allocate memory");
- msg->ref();
-
- m_loop->add_event(m_socket->get_fd(),
- (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler);
+ if (m_loop->add_event(m_socket->get_fd(), (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler) == 0) {
+ _D("Failed to add send event handler");
+ delete handler;
+ return false;
+ }
return true;
}
-bool channel::send_sync(message *msg)
+bool channel::send_sync(message &msg)
{
- retvm_if(!msg, false, "Invalid message");
- retvm_if(msg->size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg->size());
+ retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size());
ssize_t size = 0;
- char *buf = msg->body();
+ char *buf = msg.body();
/* header */
- size = m_socket->send(reinterpret_cast<void *>(msg->header()),
+ size = m_socket->send(reinterpret_cast<void *>(msg.header()),
sizeof(message_header), true);
retvm_if(size <= 0, false, "Failed to send header");
/* if body size is zero, skip to send body message */
- retv_if(msg->size() == 0, true);
+ retv_if(msg.size() == 0, true);
/* body */
- size = m_socket->send(buf, msg->size(), true);
+ size = m_socket->send(buf, msg.size(), true);
retvm_if(size <= 0, false, "Failed to send body");
return true;
read_event_handler *handler = new(std::nothrow) read_event_handler(this);
retvm_if(!handler, false, "Failed to allocate memory");
- m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
+ if (m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler) == 0) {
+ _D("Failed to add read event handler");
+ delete handler;
+ return false;
+ }
return true;
}
bool is_connected(void);
- bool send(message *msg);
- bool send_sync(message *msg);
+ bool send(std::shared_ptr<message> msg);
+ bool send_sync(message &msg);
bool read(void);
bool read_sync(message &msg, bool select = true);
(event_condition)(EVENT_IN | EVENT_HUP | EVENT_NVAL), m_accept_handler);
if (id == 0) {
+ _D("Failed to add accept event handler");
delete m_accept_handler;
m_accept_handler = NULL;
}
static std::atomic<uint64_t> sequence(0);
message::message(size_t capacity)
-: m_size(0)
-, m_capacity(capacity)
-, m_msg((char *)malloc(sizeof(char) * capacity))
-, ref_cnt(0)
+ : m_size(0)
+ , m_capacity(capacity)
+ , m_msg((char *)malloc(sizeof(char) * capacity))
{
m_header.id = sequence++;
m_header.type = UNDEFINED_TYPE;
}
message::message(const void *msg, size_t sz)
-: m_size(sz)
-, m_capacity(sz)
-, m_msg((char *)msg)
-, ref_cnt(0)
+ : m_size(sz)
+ , m_capacity(sz)
+ , m_msg((char *)msg)
{
m_header.id = sequence++;
m_header.type = UNDEFINED_TYPE;
}
message::message(const message &msg)
-: m_size(msg.m_size)
-, m_capacity(msg.m_capacity)
-, m_msg((char *)malloc(sizeof(char) * msg.m_capacity))
-, ref_cnt(0)
+ : m_size(msg.m_size)
+ , m_capacity(msg.m_capacity)
+ , m_msg((char *)malloc(sizeof(char) * msg.m_capacity))
{
::memcpy(&m_header, &msg.m_header, sizeof(message_header));
::memcpy(m_msg, msg.m_msg, msg.m_size);
}
message::message(int error)
-: m_size(0)
-, m_capacity(0)
-, m_msg(NULL)
-, ref_cnt(0)
+ : m_size(0)
+ , m_capacity(0)
+ , m_msg(NULL)
{
m_header.id = sequence++;
m_header.type = UNDEFINED_TYPE;
message::~message()
{
- if (m_msg && ref_cnt == 0) {
+ if (m_msg) {
free(m_msg);
m_msg = NULL;
}
return m_size;
}
-/* TODO: remove ref/unref and use reference counting automatically */
-void message::ref(void)
-{
- ref_cnt++;
-}
-
-void message::unref(void)
-{
- ref_cnt--;
-
- if (ref_cnt > 0 || !m_msg)
- return;
-
- free(m_msg);
- m_msg = NULL;
- delete this;
-}
-
-int message::ref_count(void)
-{
- return ref_cnt;
-}
-
message_header *message::header(void)
{
return &m_header;
#include <stdlib.h> /* size_t */
#include <atomic>
+#include <memory>
#define MAX_MSG_CAPACITY (16*1024)
#define MAX_HEADER_RESERVED 3
class message {
public:
+ template <class... Args>
+ static std::shared_ptr<message> create(Args&&... args)
+ noexcept(noexcept(message(std::forward<Args>(args)...)))
+ {
+ return std::shared_ptr<message>(new (std::nothrow) message(std::forward<Args>(args)...));
+ }
+
message(size_t capacity = MAX_MSG_CAPACITY);
message(const void *msg, size_t size);
message(const message &msg);
size_t m_capacity;
char *m_msg;
- std::atomic<int> ref_cnt;
};
}