#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
+#include <poll.h>
#include <glib.h>
#include <gio/gio.h>
return true;
}
+
/* LCOV_EXCL_START */
+static int __pop_delayed_message(port_list_info_s *port_info)
+{
+ delay_message_info_s *message;
+ int ret;
+
+ if (port_info->delayed_message_list == NULL)
+ return MESSAGE_PORT_ERROR_NONE;
+
+ message = g_list_nth_data(port_info->delayed_message_list, 0);
+ ret = __send_delayed_message(port_info->send_sock_fd, message);
+
+ if (ret != MESSAGE_PORT_ERROR_NONE)
+ return ret;
+
+ port_info->delayed_message_size -= message->size;
+ port_info->delayed_message_list = g_list_remove(port_info->delayed_message_list, message);
+ _LOGI("pop : delayed_message_size (%d), count(%d)",
+ port_info->delayed_message_size,
+ g_list_length(port_info->delayed_message_list));
+
+ __free_delay_message_info(message);
+
+ return MESSAGE_PORT_ERROR_NONE;
+
+}
+
static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer data)
{
delay_port_info *delay_info = (delay_port_info *)data;
port_list_info_s *port_info = delay_info->port_info;
- delay_message_info_s *message;
+
int ret;
if (port_info == NULL)
message_port_unlock_mutex();
return G_SOURCE_REMOVE;
} else {
- message = g_list_nth_data(port_info->delayed_message_list, 0);
- ret = __send_delayed_message(port_info->send_sock_fd, message);
-
+ ret = __pop_delayed_message(port_info);
if (ret == MESSAGE_PORT_ERROR_RESOURCE_UNAVAILABLE) {
message_port_unlock_mutex();
return G_SOURCE_CONTINUE;
message_port_unlock_mutex();
return G_SOURCE_REMOVE;
}
-
- port_info->delayed_message_size -= message->size;
-
- port_info->delayed_message_list = g_list_remove(port_info->delayed_message_list, message);
- __free_delay_message_info(message);
}
message_port_unlock_mutex();
/* LCOV_EXCL_STOP */
/* LCOV_EXCL_START */
-static int __insert_delayed_message(port_list_info_s *port_info,
+static int __push_delayed_message(port_list_info_s *port_info,
int sequence,
bundle_raw *kb_data,
int data_len,
}
/* LCOV_EXCL_STOP */
+static bool __can_write(int fd)
+{
+ struct pollfd fds[1];
+ fds[0].fd = fd;
+ fds[0].events = POLLOUT;
+ fds[0].revents = 0;
+ int ret = poll(fds, 1, 100);
+ if (ret == 0 || ret < 0) {
+ _LOGI("poll() is failed. fd(%d), ret(%d) error(%s)",
+ fd, ret, ret == 0 ? "timed out" : "");
+ return false;
+ }
+
+ return true;
+}
+
static int __message_port_send_async(port_list_info_s *port_info, bundle *kb, const char *local_port,
bool local_trusted, bool is_bidirection)
{
}
out:
+
if (ret == MESSAGE_PORT_ERROR_RESOURCE_UNAVAILABLE) {
- ret = __insert_delayed_message(port_info, sequence, kb_data, data_len, nb,
+ ret = __push_delayed_message(port_info, sequence, kb_data, data_len, nb,
local_port, local_trusted, is_bidirection);
- if (ret != MESSAGE_PORT_ERROR_NONE)
- ret = MESSAGE_PORT_ERROR_IO_ERROR;
+ if (ret != MESSAGE_PORT_ERROR_NONE) {
+ if (kb_data)
+ free(kb_data);
+ return MESSAGE_PORT_ERROR_IO_ERROR;
+ }
+
+ if (__can_write(port_info->send_sock_fd)) {
+ while (g_list_length(port_info->delayed_message_list) != 0) {
+ ret = __pop_delayed_message(port_info);
+ if (ret != MESSAGE_PORT_ERROR_NONE) {
+ if (ret == MESSAGE_PORT_ERROR_RESOURCE_UNAVAILABLE) {
+ ret = MESSAGE_PORT_ERROR_NONE;
+ } else if (ret == MESSAGE_PORT_ERROR_IO_ERROR) {
+ g_source_remove(port_info->delay_src_id);
+ port_info->delay_src_id = 0;
+ }
+ break;
+ }
+ }
+ }
}
if (kb_data)