sensord: ipc: add channel class 57/123157/2
authorkibak.yoon <kibak.yoon@samsung.com>
Tue, 4 Apr 2017 12:15:45 +0000 (21:15 +0900)
committerkibak.yoon <kibak.yoon@samsung.com>
Wed, 5 Apr 2017 00:53:13 +0000 (09:53 +0900)
- channel class is a nexus to a socket which is capable of I/O operations,
  such as read, write, connect, and bind.
- channel class provides:
  - the current state of the channel.
  - the configuration parameters of the channel.
  - the I/O operations that the channel supports.
- channel class can support synchronous/asynchronous send/read call.
- channel_handler:
  - is called by all I/O events(connect/disconnect/read and error)

Change-Id: I85365e7cb443e969653353a83f8110ab690c4303
Signed-off-by: kibak.yoon <kibak.yoon@samsung.com>
src/shared/channel.cpp [new file with mode: 0644]
src/shared/channel.h [new file with mode: 0644]
src/shared/channel_handler.h [new file with mode: 0644]

diff --git a/src/shared/channel.cpp b/src/shared/channel.cpp
new file mode 100644 (file)
index 0000000..b56d49e
--- /dev/null
@@ -0,0 +1,276 @@
+/*
+ * sensord
+ *
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "channel.h"
+
+#include <stdint.h>
+#include <unistd.h>
+#include <memory>
+
+#include "sensor_log.h"
+
+using namespace ipc;
+
+class send_event_handler : public event_handler
+{
+public:
+       send_event_handler(channel *ch, message *msg)
+       : m_ch(ch)
+       , m_msg(msg)
+       { }
+
+       bool handle(int fd, event_condition condition)
+       {
+               if (!m_ch || !m_ch->is_connected())
+                       return false;
+
+               if (condition & (EVENT_IN | EVENT_HUP))
+                       return false;
+
+               if (!m_ch->send_sync(m_msg))
+                       return false;
+
+               if (m_msg)
+                       m_msg->unref();
+
+               return false;
+       }
+
+private:
+       channel *m_ch;
+       message *m_msg;
+};
+
+class read_event_handler : public event_handler
+{
+public:
+       read_event_handler(channel *ch)
+       : m_ch(ch)
+       { }
+
+       bool handle(int fd, event_condition condition)
+       {
+               message msg;
+
+               if (!m_ch || !m_ch->is_connected())
+                       return false;
+
+               if (condition & (EVENT_OUT | EVENT_HUP))
+                       return false;
+
+               if (!m_ch->read_sync(msg))
+                       return false;
+
+               return false;
+       }
+
+private:
+       channel *m_ch;
+};
+
+channel::channel(socket *sock)
+: m_fd(sock->get_fd())
+, m_event_id(0)
+, m_socket(sock)
+, m_handler(NULL)
+, m_loop(NULL)
+, m_connected(false)
+{
+}
+
+channel::~channel()
+{
+       /* disconnect() should not be called here */
+}
+
+void channel::bind(channel_handler *handler, event_loop *loop)
+{
+       m_handler = handler;
+       m_loop = loop;
+       m_connected.store(true);
+
+       if (m_handler)
+               m_handler->connected(this);
+}
+
+bool channel::connect(channel_handler *handler, event_loop *loop)
+{
+       if (!m_socket->connect())
+               return false;
+
+       bind(handler, loop);
+       return true;
+}
+
+void channel::disconnect(void)
+{
+       ret_if(!is_connected());
+       m_connected.store(false);
+
+       if (m_handler) {
+               m_handler->disconnected(this);
+               m_handler = NULL;
+       }
+
+       if (m_loop) {
+               m_loop->remove_event(m_event_id, true);
+               m_loop = NULL;
+               m_event_id = 0;
+       }
+
+       if (m_socket) {
+               delete m_socket;
+               m_socket = NULL;
+       }
+}
+
+bool channel::send(message *msg)
+{
+       retv_if(!m_loop, false);
+
+       /* TODO: check buffer size(is there any linux api for this?) */
+       int cur_buffer_size = m_socket->get_current_buffer_size();
+       retvm_if(!cur_buffer_size > 40000, false, "Failed to send data");
+
+       send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg);
+       retvm_if(!handler, false, "Failed to allocate memory");
+
+       msg->ref();
+
+       m_loop->add_event(m_socket->get_fd(),
+                       (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler);
+
+       return true;
+}
+
+bool channel::send_sync(message *msg)
+{
+       retv_if(!msg, false);
+
+       ssize_t size = 0;
+       char *buf = msg->body();
+
+       /* header */
+       size = m_socket->send(reinterpret_cast<void *>(msg->header()),
+                                  sizeof(message_header), true);
+       retv_if(size <= 0, false);
+       retv_if(msg->size() <= 0, true);
+
+       /* body */
+       size = m_socket->send(buf, msg->size(), true);
+       retv_if(size <= 0, false);
+
+       return true;
+}
+
+bool channel::read(void)
+{
+       retv_if(!m_loop, false);
+
+       read_event_handler *handler = new(std::nothrow) read_event_handler(this);
+       retvm_if(!handler, false, "Failed to allocate memory");
+
+       m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
+
+       return true;
+}
+
+bool channel::read_sync(message &msg)
+{
+       message_header header;
+       ssize_t size = 0;
+       char buf[MAX_MSG_CAPACITY];
+
+       /* header */
+       size = m_socket->recv(&header, sizeof(message_header), true);
+       retv_if(size <= 0, false);
+
+       /* check error from header */
+       if (m_handler && header.err != 0) {
+               m_handler->error_caught(this, header.err);
+               msg.header()->err = header.err;
+               return true;
+       }
+
+       /* body */
+       if (header.length > 0) {
+               size = m_socket->recv(&buf, header.length, true);
+               retv_if(size <= 0, false);
+       }
+
+       buf[header.length] = '\0';
+       msg.enclose(reinterpret_cast<const void *>(buf), header.length);
+       msg.set_type(header.type);
+       msg.header()->err = header.err;
+
+       if (m_handler)
+               m_handler->read(this, msg);
+
+       return true;
+}
+
+bool channel::is_connected(void)
+{
+       return m_connected.load();
+}
+
+bool channel::set_option(int type, int value)
+{
+       switch (type) {
+       case SO_SNDBUF:
+               m_socket->set_buffer_size(type, value);
+               break;
+       case SO_RCVBUF:
+               m_socket->set_buffer_size(type, value);
+               break;
+       default:
+               break;
+       }
+
+       return true;
+}
+
+bool channel::get_option(int type, int &value) const
+{
+       switch (type) {
+       case 0:
+               value = m_socket->get_current_buffer_size();
+               break;
+       case SO_SNDBUF:
+               value = m_socket->get_buffer_size(type);
+               break;
+       case SO_RCVBUF:
+               value = m_socket->get_buffer_size(type);
+               break;
+       default:
+               break;
+       }
+
+       return true;
+}
+
+int channel::get_fd(void) const
+{
+       return m_fd;
+}
+
+void channel::set_event_id(uint64_t id)
+{
+       m_event_id = id;
+}
diff --git a/src/shared/channel.h b/src/shared/channel.h
new file mode 100644 (file)
index 0000000..e37a7a9
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * sensord
+ *
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef __CHANNEL_H__
+#define __CHANNEL_H__
+
+#include <unistd.h>
+#include <atomic>
+
+#include "socket.h"
+#include "message.h"
+#include "event_loop.h"
+#include "channel_handler.h"
+
+namespace ipc {
+
+class channel_handler;
+
+class channel {
+public:
+       /* move owernership of the socket to the channel */
+       channel(socket *sock);
+       ~channel();
+
+       void bind(channel_handler *handler, event_loop *loop);
+
+       bool connect(channel_handler *handler, event_loop *loop);
+       void disconnect(void);
+
+       bool is_connected(void);
+
+       bool send(message *msg);
+       bool send_sync(message *msg);
+
+       bool read(void);
+       bool read_sync(message &msg);
+
+       bool get_option(int type, int &value) const;
+       bool set_option(int type, int value);
+
+       int  get_fd(void) const;
+       void set_event_id(uint64_t id);
+
+private:
+       int m_fd;
+       uint64_t m_event_id;
+       socket *m_socket;
+       channel_handler *m_handler;
+       event_loop *m_loop;
+
+       std::atomic<bool> m_connected;
+};
+
+}
+
+#endif /* __CHANNEL_H__ */
diff --git a/src/shared/channel_handler.h b/src/shared/channel_handler.h
new file mode 100644 (file)
index 0000000..b420fbd
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * sensord
+ *
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef __CHANNEL_HANDLER_H__
+#define __CHANNEL_HANDLER_H__
+
+namespace ipc {
+
+class channel;
+class message;
+
+class channel_handler {
+public:
+       virtual ~channel_handler() {}
+
+       virtual void connected(channel *ch) = 0;
+       virtual void disconnected(channel *ch) = 0;
+       virtual void read(channel *ch, message &msg) = 0;
+       virtual void read_complete(channel *ch) = 0;
+       virtual void error_caught(channel *ch, int error) = 0;
+};
+
+}
+
+#endif /* __CHANNEL_HANDLER_H__ */