4 * Copyright (c) 2017 Samsung Electronics Co., Ltd.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
26 #include "sensor_log.h"
27 #include "channel_event_handler.h"
29 #define SYSTEMD_SOCK_BUF_SIZE (128*1024)
33 class send_event_handler : public event_handler
36 send_event_handler(channel *ch, std::shared_ptr<message> msg)
41 bool handle(int fd, event_condition condition)
43 if (!m_ch || !m_ch->is_connected())
46 if (condition & (EVENT_IN | EVENT_HUP))
49 if (!m_ch->send_sync(*m_msg))
57 std::shared_ptr<message> m_msg;
60 class read_event_handler : public event_handler
63 read_event_handler(channel *ch)
67 bool handle(int fd, event_condition condition)
71 if (!m_ch || !m_ch->is_connected())
74 if (condition & (EVENT_OUT | EVENT_HUP))
77 if (!m_ch->read_sync(msg, false))
87 channel::channel(socket *sock)
88 : m_fd(sock->get_fd())
100 _D("Destroyed[%llu]", m_event_id);
104 uint64_t channel::bind(void)
107 m_event_id = m_loop->add_event(m_socket->get_fd(),
108 (EVENT_IN | EVENT_HUP | EVENT_NVAL),
109 dynamic_cast<channel_event_handler *>(m_handler));
111 _D("Bound[%llu]", m_event_id);
115 uint64_t channel::bind(channel_handler *handler, event_loop *loop, bool loop_bind)
119 m_connected.store(true);
122 m_handler->connected(this);
130 uint64_t channel::connect(channel_handler *handler, event_loop *loop, bool loop_bind)
132 if (!m_socket->connect())
135 bind(handler, loop, loop_bind);
137 _D("Connected[%llu]", m_event_id);
141 void channel::disconnect(void)
143 if (!is_connected()) {
144 _D("Channel is not connected");
148 m_connected.store(false);
150 _D("Disconnecting..[%llu]", m_event_id);
153 m_handler->disconnected(this);
158 _D("Remove event[%llu]", m_event_id);
159 m_loop->remove_event(m_event_id, true);
165 _D("Release socket[%d]", m_socket->get_fd());
173 bool channel::send(std::shared_ptr<message> msg)
176 int cur_buffer_size = 0;
178 retv_if(!m_loop, false);
180 while (retry_cnt < 3) {
181 cur_buffer_size = m_socket->get_current_buffer_size();
182 if (cur_buffer_size <= SYSTEMD_SOCK_BUF_SIZE)
187 retvm_if(retry_cnt >= 3, false, "Socket buffer[%d] is exceeded", cur_buffer_size);
189 send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg);
190 retvm_if(!handler, false, "Failed to allocate memory");
192 if (m_loop->add_event(m_socket->get_fd(), (EVENT_OUT | EVENT_HUP | EVENT_NVAL) , handler) == 0) {
193 _D("Failed to add send event handler");
201 bool channel::send_sync(message &msg)
203 retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size());
206 char *buf = msg.body();
209 size = m_socket->send(reinterpret_cast<void *>(msg.header()),
210 sizeof(message_header), true);
211 retvm_if(size <= 0, false, "Failed to send header");
213 /* if body size is zero, skip to send body message */
214 retv_if(msg.size() == 0, true);
217 size = m_socket->send(buf, msg.size(), true);
218 retvm_if(size <= 0, false, "Failed to send body");
223 bool channel::read(void)
225 retv_if(!m_loop, false);
227 read_event_handler *handler = new(std::nothrow) read_event_handler(this);
228 retvm_if(!handler, false, "Failed to allocate memory");
230 if (m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler) == 0) {
231 _D("Failed to add read event handler");
239 bool channel::read_sync(message &msg, bool select)
241 message_header header;
243 char buf[MAX_MSG_CAPACITY];
246 size = m_socket->recv(&header, sizeof(message_header), select);
247 retv_if(size <= 0, false);
249 /* check error from header */
250 if (m_handler && header.err != 0) {
251 m_handler->error_caught(this, header.err);
252 msg.header()->err = header.err;
257 if (header.length >= MAX_MSG_CAPACITY) {
258 _E("header.length error %u", header.length);
262 if (header.length > 0) {
263 size = m_socket->recv(&buf, header.length, select);
264 retv_if(size <= 0, false);
267 buf[header.length] = '\0';
268 msg.enclose(reinterpret_cast<const void *>(buf), header.length);
269 msg.set_type(header.type);
270 msg.header()->err = header.err;
273 m_handler->read(this, msg);
278 bool channel::is_connected(void)
280 return m_connected.load();
283 bool channel::set_option(int type, int value)
287 m_socket->set_buffer_size(type, value);
290 m_socket->set_buffer_size(type, value);
299 bool channel::get_option(int type, int &value) const
303 value = m_socket->get_current_buffer_size();
306 value = m_socket->get_buffer_size(type);
309 value = m_socket->get_buffer_size(type);
318 int channel::get_fd(void) const