4 * Copyright (c) 2017 Samsung Electronics Co., Ltd.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
27 #include "sensor_log.h"
28 #include "channel_event_handler.h"
30 #define SYSTEMD_SOCK_BUF_SIZE (128*1024)
34 class send_event_handler : public event_handler
37 send_event_handler(channel *ch, std::shared_ptr<message> msg)
42 bool handle(int fd, event_condition condition)
48 m_ch->remove_pending_event_id(m_event_id);
50 if (!m_ch->is_connected()) {
54 if (condition & (EVENT_IN | EVENT_HUP)) {
58 if (!m_ch->send_sync(*m_msg)) {
67 std::shared_ptr<message> m_msg;
70 class read_event_handler : public event_handler
73 read_event_handler(channel *ch)
77 bool handle(int fd, event_condition condition)
83 m_ch->remove_pending_event_id(m_event_id);
85 if (!m_ch->is_connected()) {
89 if (condition & (EVENT_OUT | EVENT_HUP)) {
94 if (!m_ch->read_sync(msg, false)) {
105 channel::channel(socket *sock)
106 : m_fd(sock->get_fd())
113 _D("Create[%p]", this);
118 _D("Destroy[%p]", this);
119 if (is_connected()) {
124 uint64_t channel::bind(void)
127 m_event_id = m_loop->add_event(m_socket->get_fd(),
128 (EVENT_IN | EVENT_HUP | EVENT_NVAL),
129 dynamic_cast<channel_event_handler *>(m_handler));
131 _D("Bind channel[%p] : handler[%p] event_id[%llu]", this, m_handler, m_event_id);
135 uint64_t channel::bind(channel_handler *handler, event_loop *loop, bool loop_bind)
139 m_connected.store(true);
142 m_handler->connected(this);
150 uint64_t channel::connect(channel_handler *handler, event_loop *loop, bool loop_bind)
152 if (!m_socket->connect())
155 bind(handler, loop, loop_bind);
157 _D("Connect channel[%p] : event id[%llu]", this, m_event_id);
161 void channel::disconnect(void)
164 if (!is_connected()) {
165 _D("Channel[%p] is not connected", this);
169 m_connected.store(false);
171 _D("Disconnect channel[%p]", this);
174 _D("Disconnect channel[%p] handler[%p]", this, m_handler);
175 m_handler->disconnected(this);
180 for(auto id : m_pending_event_id) {
181 _D("Remove channel[%p] pending event id[%llu]", this, id);
182 m_loop->remove_event(id, true);
184 _D("Remove channel[%p] event[%llu]",this, m_event_id);
185 m_loop->remove_event(m_event_id, true);
190 _D("Release channel[%p] socket[%d]", this, m_socket->get_fd());
194 _D("Channel[%p] is disconnected", this);
197 bool channel::send(std::shared_ptr<message> msg)
200 int cur_buffer_size = 0;
202 retv_if(!m_loop, false);
204 while (retry_cnt < 3) {
205 cur_buffer_size = m_socket->get_current_buffer_size();
206 if (cur_buffer_size <= SYSTEMD_SOCK_BUF_SIZE)
211 retvm_if(retry_cnt >= 3, false, "Socket buffer[%d] is exceeded", cur_buffer_size);
213 send_event_handler *handler = new(std::nothrow) send_event_handler(this, msg);
214 retvm_if(!handler, false, "Failed to allocate memory");
216 uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_OUT | EVENT_HUP | EVENT_NVAL), handler);
218 _D("Failed to add send event handler");
223 m_pending_event_id.push_back(event_id);
227 bool channel::send_sync(message &msg)
230 if (!is_connected()) {
231 _D("Channel is not connected");
235 retvm_if(msg.size() >= MAX_MSG_CAPACITY, true, "Invaild message size[%u]", msg.size());
238 char *buf = msg.body();
241 size = m_socket->send(reinterpret_cast<void *>(msg.header()),
242 sizeof(message_header), true);
243 retvm_if(size <= 0, false, "Failed to send header");
245 /* if body size is zero, skip to send body message */
246 retv_if(msg.size() == 0, true);
249 size = m_socket->send(buf, msg.size(), true);
250 retvm_if(size <= 0, false, "Failed to send body");
255 bool channel::read(void)
257 retv_if(!m_loop, false);
259 read_event_handler *handler = new(std::nothrow) read_event_handler(this);
260 retvm_if(!handler, false, "Failed to allocate memory");
262 uint64_t event_id = m_loop->add_event(m_socket->get_fd(), (EVENT_IN | EVENT_HUP | EVENT_NVAL), handler);
264 _D("Failed to add read event handler");
269 m_pending_event_id.push_back(event_id);
273 bool channel::read_sync(message &msg, bool select)
276 if (!is_connected()) {
277 _D("Channel is not connected");
281 message_header header;
283 char buf[MAX_MSG_CAPACITY];
286 size = m_socket->recv(&header, sizeof(message_header), select);
293 /* check error from header */
294 if (m_handler && header.err != 0) {
295 m_handler->error_caught(this, header.err);
296 msg.header()->err = header.err;
301 if (header.length >= MAX_MSG_CAPACITY) {
302 _E("header.length error %u", header.length);
306 if (header.length > 0) {
307 size = m_socket->recv(&buf, header.length, select);
316 buf[header.length] = '\0';
317 msg.enclose(reinterpret_cast<const void *>(buf), header.length);
318 msg.set_type(header.type);
319 msg.header()->err = header.err;
322 m_handler->read(this, msg);
327 bool channel::is_connected(void)
329 return m_connected.load();
332 bool channel::set_option(int type, int value)
336 m_socket->set_buffer_size(type, value);
339 m_socket->set_buffer_size(type, value);
348 bool channel::get_option(int type, int &value) const
352 value = m_socket->get_current_buffer_size();
355 value = m_socket->get_buffer_size(type);
358 value = m_socket->get_buffer_size(type);
367 int channel::get_fd(void) const
372 void channel::remove_pending_event_id(uint64_t id)
374 auto it = std::find(m_pending_event_id.begin(), m_pending_event_id.end(), id);
375 if (it != m_pending_event_id.end()) {
376 m_pending_event_id.erase(it);