From eae39a262b2ec340fd928181fdb2f39e38072cf0 Mon Sep 17 00:00:00 2001 From: "kibak.yoon" Date: Tue, 4 Apr 2017 21:15:45 +0900 Subject: [PATCH] sensord: ipc: add channel class - channel class is a nexus to a socket which is capable of I/O operations, such as read, write, connect, and bind. - channel class provides: - the current state of the channel. - the configuration parameters of the channel. - the I/O operations that the channel supports. - channel class can support synchronous/asynchronous send/read call. - channel_handler: - is called by all I/O events(connect/disconnect/read and error) Change-Id: I85365e7cb443e969653353a83f8110ab690c4303 Signed-off-by: kibak.yoon --- src/shared/channel.cpp | 276 +++++++++++++++++++++++++++++++++++++++++++ src/shared/channel.h | 72 +++++++++++ src/shared/channel_handler.h | 41 +++++++ 3 files changed, 389 insertions(+) create mode 100644 src/shared/channel.cpp create mode 100644 src/shared/channel.h create mode 100644 src/shared/channel_handler.h diff --git a/src/shared/channel.cpp b/src/shared/channel.cpp new file mode 100644 index 0000000..b56d49e --- /dev/null +++ b/src/shared/channel.cpp @@ -0,0 +1,276 @@ +/* + * sensord + * + * Copyright (c) 2017 Samsung Electronics Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "channel.h" + +#include +#include +#include + +#include "sensor_log.h" + +using namespace ipc; + +class send_event_handler : public event_handler +{ +public: + send_event_handler(channel *ch, message *msg) + : m_ch(ch) + , m_msg(msg) + { } + + bool handle(int fd, event_condition condition) + { + if (!m_ch || !m_ch->is_connected()) + return false; + + if (condition & (EVENT_IN | EVENT_HUP)) + return false; + + if (!m_ch->send_sync(m_msg)) + return false; + + if (m_msg) + m_msg->unref(); + + return false; + } + +private: + channel *m_ch; + message *m_msg; +}; + +class read_event_handler : public event_handler +{ +public: + read_event_handler(channel *ch) + : m_ch(ch) + { } + + bool handle(int fd, event_condition condition) + { + message msg; + + if (!m_ch || !m_ch->is_connected()) + return false; + + if (condition & (EVENT_OUT | EVENT_HUP)) + return false; + + if (!m_ch->read_sync(msg)) + return false; + + return false; + } + +private: + channel *m_ch; +}; + +channel::channel(socket *sock) +: m_fd(sock->get_fd()) +, m_event_id(0) +, m_socket(sock) +, m_handler(NULL) +, m_loop(NULL) +, m_connected(false) +{ +} + +channel::~channel() +{ + /* disconnect() should not be called here */ +} + +void channel::bind(channel_handler *handler, event_loop *loop) +{ + m_handler = handler; + m_loop = loop; + m_connected.store(true); + + if (m_handler) + m_handler->connected(this); +} + +bool channel::connect(channel_handler *handler, event_loop *loop) +{ + if (!m_socket->connect()) + return false; + + bind(handler, loop); + return true; +} + +void channel::disconnect(void) +{ + ret_if(!is_connected()); + m_connected.store(false); + + if (m_handler) { + m_handler->disconnected(this); + m_handler = NULL; + } + + if (m_loop) { + m_loop->remove_event(m_event_id, true); + m_loop = NULL; + m_event_id = 0; + } + + if (m_socket) { + delete m_socket; + m_socket = NULL; + } +} + +bool channel::send(message *msg) +{ + retv_if(!m_loop, false); + + /* TODO: check buffer size(is there any linux api for this?) */ + int cur_buffer_size = m_socket->get_current_buffer_size(); + retvm_if(!cur_buffer_size > 40000, false, "Failed to send data"); + + send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg); + retvm_if(!handler, false, "Failed to allocate memory"); + + msg->ref(); + + m_loop->add_event(m_socket->get_fd(), + (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler); + + return true; +} + +bool channel::send_sync(message *msg) +{ + retv_if(!msg, false); + + ssize_t size = 0; + char *buf = msg->body(); + + /* header */ + size = m_socket->send(reinterpret_cast(msg->header()), + sizeof(message_header), true); + retv_if(size <= 0, false); + retv_if(msg->size() <= 0, true); + + /* body */ + size = m_socket->send(buf, msg->size(), true); + retv_if(size <= 0, false); + + return true; +} + +bool channel::read(void) +{ + retv_if(!m_loop, false); + + read_event_handler *handler = new(std::nothrow) read_event_handler(this); + retvm_if(!handler, false, "Failed to allocate memory"); + + m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler); + + return true; +} + +bool channel::read_sync(message &msg) +{ + message_header header; + ssize_t size = 0; + char buf[MAX_MSG_CAPACITY]; + + /* header */ + size = m_socket->recv(&header, sizeof(message_header), true); + retv_if(size <= 0, false); + + /* check error from header */ + if (m_handler && header.err != 0) { + m_handler->error_caught(this, header.err); + msg.header()->err = header.err; + return true; + } + + /* body */ + if (header.length > 0) { + size = m_socket->recv(&buf, header.length, true); + retv_if(size <= 0, false); + } + + buf[header.length] = '\0'; + msg.enclose(reinterpret_cast(buf), header.length); + msg.set_type(header.type); + msg.header()->err = header.err; + + if (m_handler) + m_handler->read(this, msg); + + return true; +} + +bool channel::is_connected(void) +{ + return m_connected.load(); +} + +bool channel::set_option(int type, int value) +{ + switch (type) { + case SO_SNDBUF: + m_socket->set_buffer_size(type, value); + break; + case SO_RCVBUF: + m_socket->set_buffer_size(type, value); + break; + default: + break; + } + + return true; +} + +bool channel::get_option(int type, int &value) const +{ + switch (type) { + case 0: + value = m_socket->get_current_buffer_size(); + break; + case SO_SNDBUF: + value = m_socket->get_buffer_size(type); + break; + case SO_RCVBUF: + value = m_socket->get_buffer_size(type); + break; + default: + break; + } + + return true; +} + +int channel::get_fd(void) const +{ + return m_fd; +} + +void channel::set_event_id(uint64_t id) +{ + m_event_id = id; +} diff --git a/src/shared/channel.h b/src/shared/channel.h new file mode 100644 index 0000000..e37a7a9 --- /dev/null +++ b/src/shared/channel.h @@ -0,0 +1,72 @@ +/* + * sensord + * + * Copyright (c) 2017 Samsung Electronics Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef __CHANNEL_H__ +#define __CHANNEL_H__ + +#include +#include + +#include "socket.h" +#include "message.h" +#include "event_loop.h" +#include "channel_handler.h" + +namespace ipc { + +class channel_handler; + +class channel { +public: + /* move owernership of the socket to the channel */ + channel(socket *sock); + ~channel(); + + void bind(channel_handler *handler, event_loop *loop); + + bool connect(channel_handler *handler, event_loop *loop); + void disconnect(void); + + bool is_connected(void); + + bool send(message *msg); + bool send_sync(message *msg); + + bool read(void); + bool read_sync(message &msg); + + bool get_option(int type, int &value) const; + bool set_option(int type, int value); + + int get_fd(void) const; + void set_event_id(uint64_t id); + +private: + int m_fd; + uint64_t m_event_id; + socket *m_socket; + channel_handler *m_handler; + event_loop *m_loop; + + std::atomic m_connected; +}; + +} + +#endif /* __CHANNEL_H__ */ diff --git a/src/shared/channel_handler.h b/src/shared/channel_handler.h new file mode 100644 index 0000000..b420fbd --- /dev/null +++ b/src/shared/channel_handler.h @@ -0,0 +1,41 @@ +/* + * sensord + * + * Copyright (c) 2017 Samsung Electronics Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef __CHANNEL_HANDLER_H__ +#define __CHANNEL_HANDLER_H__ + +namespace ipc { + +class channel; +class message; + +class channel_handler { +public: + virtual ~channel_handler() {} + + virtual void connected(channel *ch) = 0; + virtual void disconnected(channel *ch) = 0; + virtual void read(channel *ch, message &msg) = 0; + virtual void read_complete(channel *ch) = 0; + virtual void error_caught(channel *ch, int error) = 0; +}; + +} + +#endif /* __CHANNEL_HANDLER_H__ */ -- 2.7.4