Change the socket to non-blocking mode. 22/133122/4
authorjusung son <jusung07.son@samsung.com>
Fri, 9 Jun 2017 04:28:55 +0000 (13:28 +0900)
committerjusung son <jusung07.son@samsung.com>
Wed, 14 Jun 2017 04:02:37 +0000 (13:02 +0900)
 - In blocking mode, the sender is blocked when the buffer of the receiver is full

 - Related patch
  [amd] https://review.tizen.org/gerrit/#/c/133124/

Change-Id: I503ae45db0dd04b9b267af7e08558b00386fa69d
Signed-off-by: jusung son <jusung07.son@samsung.com>
src/message-port.c
src/message_port.c [changed mode: 0644->0755]

index 6868a5e..53a0f73 100755 (executable)
@@ -34,6 +34,9 @@
 #include <aul.h>
 #include <gio/gio.h>
 #include <gio/gunixfdlist.h>
+#include <pthread.h>
+#include <glib-unix.h>
+#include <poll.h>
 
 #include "message-port.h"
 #include "message-port-log.h"
@@ -135,6 +138,9 @@ typedef struct port_list_info {
        bool exist;
        GIOChannel *gio_read;
        int g_src_id;
+       GList *delayed_message_list;
+       unsigned int delayed_message_size;
+       int delay_src_id;
 } port_list_info_s;
 
 typedef struct registered_callback_info {
@@ -147,6 +153,34 @@ typedef struct registered_callback_info {
        messageport_registration_event_cb unregistered_cb;
 } registered_callback_info_s;
 
+enum transmission_sequence {
+       SEQUENCE_START = 0,
+       SEQUENCE_PORT_LEN,
+       SEQUENCE_PORT_NAME,
+       SEQUENCE_BIDIRECTION,
+       SEQUENCE_TRUSTED,
+       SEQUENCE_DTAT_LEN,
+       SEQUENCE_DATA,
+       SEQUENCE_END
+};
+
+typedef struct delay_message {
+       unsigned int size;
+       unsigned int sent_bytes;
+       int sequence;
+       int local_port_len;
+       char *local_port_name;
+       bool is_bidirection;
+       bool local_trusted;
+       int data_len;
+       bundle_raw *data;
+} delay_message_info_s;
+
+
+extern pthread_mutex_t mutex;
+static void __free_list_delay_message_info(gpointer data);
+
+
 static void __callback_info_free(gpointer data)
 {
        message_port_callback_info_s *callback_info = (message_port_callback_info_s *)data;
@@ -466,6 +500,19 @@ static void __clear_disconnect_socket(port_list_info_s *port_info)
                g_source_remove(port_info->g_src_id);
                port_info->g_src_id = 0;
        }
+
+       if (port_info->delay_src_id != 0) {
+               g_source_remove(port_info->delay_src_id);
+               port_info->delay_src_id = 0;
+       }
+
+       if (port_info->delayed_message_list != NULL) {
+               g_list_free_full(port_info->delayed_message_list, __free_list_delay_message_info);
+               /* can be reused */
+               port_info->delayed_message_list = NULL;
+       }
+
+       port_info->delayed_message_size = 0;
        port_info->send_sock_fd = 0;
 }
 
@@ -604,13 +651,30 @@ out:
 static int __write_socket(int fd,
                const char *buffer,
                unsigned int nbytes,
-               unsigned int *bytes_write)
+               unsigned int *bytes_write,
+               int *sequence)
 {
+#define SEND_TIMEOUT 500 /* milliseconds */
+
        unsigned int left = nbytes;
        ssize_t nb;
        int retry_cnt = 0;
+       struct pollfd fds[1];
+       int ret;
 
+       *sequence += 1;
        *bytes_write = 0;
+
+       fds[0].fd = fd;
+       fds[0].events = POLLOUT;
+       fds[0].revents = 0;
+
+       ret = poll(fds, 1, SEND_TIMEOUT);
+       if (ret == 0) {
+               LOGE("__write_socket: : fd %d poll timeout", fd);
+               return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE;
+       }
+
        while (left && (retry_cnt < MAX_RETRY_CNT)) {
                nb = write(fd, buffer, left);
                if (nb == -1) {
@@ -620,6 +684,10 @@ static int __write_socket(int fd,
                                continue;
                        }
                        LOGE("__write_socket: ...error fd %d: errno %d\n", fd, errno);
+
+                       if (errno == EWOULDBLOCK || errno == EAGAIN)
+                               return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE;
+
                        return MESSAGEPORT_ERROR_IO_ERROR;
                }
 
@@ -631,20 +699,31 @@ static int __write_socket(int fd,
        return MESSAGEPORT_ERROR_NONE;
 }
 
-static int __write_string_to_socket(int fd, const char *buffer, int string_len)
+static int __write_string_to_socket(int fd,
+               const char *buffer,
+               int string_len,
+               unsigned int *bytes_write,
+               int *sequence)
 {
-       unsigned int nb;
-       if (__write_socket(fd, (char *)&string_len, sizeof(string_len), &nb) != MESSAGEPORT_ERROR_NONE) {
+       int ret;
+
+       ret = __write_socket(fd, (char *)&string_len, sizeof(string_len),
+                       bytes_write, sequence);
+       if (ret != MESSAGEPORT_ERROR_NONE) {
                _LOGE("write string_len fail");
-               return MESSAGEPORT_ERROR_IO_ERROR;
+               return ret;
        }
 
        if (string_len > 0) {
-               if (__write_socket(fd, buffer, string_len, &nb) != MESSAGEPORT_ERROR_NONE) {
+               ret = __write_socket(fd, buffer, string_len, bytes_write, sequence);
+               if (ret != MESSAGEPORT_ERROR_NONE) {
                        _LOGE("wirte buffer fail");
-                       return MESSAGEPORT_ERROR_IO_ERROR;
+                       return ret;
                }
+       } else {
+               *sequence += 1;
        }
+
        return MESSAGEPORT_ERROR_NONE;
 }
 
@@ -1420,19 +1499,271 @@ static int __register_message_port(const char *local_port, bool is_trusted, mess
        return local_id;
 }
 
-int __message_port_send_async(int sockfd, bundle *kb, const char *local_port,
+static void __free_delay_message_info(delay_message_info_s *message)
+{
+       if (message != NULL) {
+               FREE_AND_NULL(message->local_port_name);
+               FREE_AND_NULL(message->data);
+               FREE_AND_NULL(message);
+       }
+}
+
+static void __free_list_delay_message_info(gpointer data)
+{
+       delay_message_info_s *message = (delay_message_info_s *)data;
+
+       if (message != NULL)
+               __free_delay_message_info(message);
+}
+
+static int __send_delayed_message(int sockfd, delay_message_info_s *message)
+{
+       unsigned int nb = 0;
+       int sequence = message->sequence - 1;
+       int ret = MESSAGEPORT_ERROR_NONE;
+       bool is_startline = true;
+       int offset = 0;
+
+       _LOGI("send_delayed_message : sockfd (%d) sequence(%d) sent byte(%d)",
+               sockfd, message->sequence, message->sent_bytes);
+
+       switch (message->sequence) {
+       case SEQUENCE_START:
+               sequence++;
+               is_startline = false;
+
+       case SEQUENCE_PORT_LEN:
+               if (is_startline)
+                       offset = message->sent_bytes;
+
+               ret = __write_socket(sockfd, ((char *)&message->local_port_len) + offset,
+                               sizeof(message->local_port_len) - offset, &nb, &sequence);
+               if (ret != MESSAGEPORT_ERROR_NONE) {
+                       _LOGE("write local_port_len fail");
+                       goto out;
+               }
+               offset = 0;
+               is_startline = false;
+
+       case SEQUENCE_PORT_NAME:
+               if (is_startline)
+                       offset = message->sent_bytes;
+
+               if (message->local_port_len > 0)
+                       ret = __write_socket(sockfd, message->local_port_name + offset,
+                               message->local_port_len - offset , &nb, &sequence);
+               else
+                       sequence++;
+
+               if (ret != MESSAGEPORT_ERROR_NONE) {
+                       _LOGE("write local_port fail");
+                       goto out;
+               }
+               offset = 0;
+               is_startline = false;
+
+       case SEQUENCE_BIDIRECTION:
+               if (is_startline)
+                       offset = message->sent_bytes;
+
+               ret = __write_socket(sockfd, ((char *)&message->is_bidirection) + offset,
+                               sizeof(message->is_bidirection) - offset, &nb, &sequence);
+               if (ret != MESSAGEPORT_ERROR_NONE) {
+                       _LOGE("write is_bidirection fail");
+                       goto out;
+               }
+               offset = 0;
+               is_startline = false;
+
+       case SEQUENCE_TRUSTED:
+               if (is_startline)
+                       offset = message->sent_bytes;
+
+               ret = __write_socket(sockfd, ((char *)&message->local_trusted) + offset,
+                               sizeof(message->local_trusted) - offset, &nb, &sequence);
+               if (ret != MESSAGEPORT_ERROR_NONE) {
+                       _LOGE("write local_trusted fail");
+                       goto out;
+               }
+               offset = 0;
+               is_startline = false;
+
+       case SEQUENCE_DTAT_LEN:
+               if (is_startline)
+                       offset = message->sent_bytes;
+
+               ret = __write_socket(sockfd, ((char *)&message->data_len) + offset,
+                               sizeof(message->data_len) - offset, &nb, &sequence);
+               if (ret != MESSAGEPORT_ERROR_NONE) {
+                       _LOGE("write data_len fail");
+                       goto out;
+               }
+               offset = 0;
+               is_startline = false;
+
+       case SEQUENCE_DATA:
+               if (is_startline)
+                       offset = message->sent_bytes;
+
+               ret = __write_socket(sockfd, (char *)message->data + offset,
+                       message->data_len -offset, &nb, &sequence);
+
+               if (ret != MESSAGEPORT_ERROR_NONE) {
+                       _LOGE("write data fail");
+                       goto out;
+               }
+               offset = 0;
+               is_startline = false;
+
+       default:
+               ret = MESSAGEPORT_ERROR_NONE;
+
+       }
+
+out:
+       if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) {
+               if (is_startline)
+                        message->sent_bytes += nb;
+               else
+                        message->sent_bytes = nb;
+
+               message->sequence = sequence;
+               _LOGE("send_delayed_message fail : sockfd (%d) sequence(%d) sent byte(%d)",
+                       sockfd, message->sequence, message->sent_bytes);
+       }
+
+       return ret;
+
+}
+
+static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer data)
+{
+       port_list_info_s *port_info = (port_list_info_s *)data;
+       delay_message_info_s *message;
+       int ret;
+
+       if (port_info == NULL)
+               return G_SOURCE_REMOVE;
+
+       pthread_mutex_lock(&mutex);
+
+       if (port_info->delayed_message_list == NULL) {
+               port_info->delayed_message_size = 0;
+               port_info->delay_src_id = 0;
+               pthread_mutex_unlock(&mutex);
+               return G_SOURCE_REMOVE;
+       } else {
+               message = g_list_nth_data(port_info->delayed_message_list, 0);
+               ret = __send_delayed_message(port_info->send_sock_fd, message);
+
+               if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) {
+                       pthread_mutex_unlock(&mutex);
+                       return G_SOURCE_CONTINUE;
+               } else if (ret == MESSAGEPORT_ERROR_IO_ERROR) {
+                       __clear_disconnect_socket(port_info);
+                       pthread_mutex_unlock(&mutex);
+                       return G_SOURCE_REMOVE;
+               }
+
+               port_info->delayed_message_size -= message->size;
+
+               port_info->delayed_message_list = g_list_remove(port_info->delayed_message_list, message);
+               __free_delay_message_info(message);
+       }
+
+       pthread_mutex_unlock(&mutex);
+
+       return G_SOURCE_CONTINUE;
+}
+
+static int __insert_delayed_message(port_list_info_s *port_info,
+       int sequence,
+       bundle_raw *kb_data,
+       int data_len,
+       unsigned int sent_bytes,
+       const char *local_port,
+       bool local_trusted,
+       bool is_bidirection)
+{
+#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB per remote port (MAX) */
+
+       unsigned int tmp_size;
+       unsigned int message_size;
+       int ret = MESSAGEPORT_ERROR_NONE;
+
+       if (port_info->delayed_message_size >= QUEUE_SIZE_MAX) {
+               _LOGE("cache fail : delayed_message_size (%d), count(%d)",
+                       port_info->delayed_message_size, g_list_length(port_info->delayed_message_list));
+               return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE;
+       }
+
+       delay_message_info_s *message = (delay_message_info_s *)calloc(1, sizeof(delay_message_info_s));
+       retvm_if(!message, MESSAGEPORT_ERROR_OUT_OF_MEMORY, "Malloc failed");
+
+       message_size = sizeof(delay_message_info_s);
+
+       message->sequence = sequence;
+       tmp_size = strlen(local_port) + 1;
+       message_size += tmp_size;
+       message->local_port_len = tmp_size;
+       message->local_port_name = strdup(local_port);
+       if (message->local_port_name == NULL) {
+               _LOGE("local_port_name strdup fail");
+               ret = MESSAGEPORT_ERROR_OUT_OF_MEMORY;
+               goto out;
+       }
+       message->is_bidirection = is_bidirection;
+       message->local_trusted = local_trusted;
+       message_size += data_len;
+       message->data_len = data_len;
+       message->data = (bundle_raw *)strdup((const char *)kb_data);
+       if (message->data == NULL) {
+               _LOGE("data strdup fail");
+               ret = MESSAGEPORT_ERROR_OUT_OF_MEMORY;
+               goto out;
+       }
+
+
+       message->sent_bytes = sent_bytes;
+       message->size = message_size;
+       port_info->delayed_message_size += message_size;
+
+       port_info->delayed_message_list = g_list_append(port_info->delayed_message_list, message);
+
+       if (port_info->delay_src_id == 0) {
+                       port_info->delay_src_id = g_unix_fd_add_full(G_PRIORITY_DEFAULT,
+                                                       port_info->send_sock_fd, G_IO_OUT, __process_delayed_message,
+                                                       port_info, NULL);
+       }
+
+       _LOGE("inserted : pm(%s) fd(%d) ms(%d) ds(%d) dlc(%d) sqn(%d) sb (%d)",
+               port_info->port_name, port_info->send_sock_fd, message_size,
+               port_info->delayed_message_size,
+               g_list_length(port_info->delayed_message_list), sequence, sent_bytes);
+
+
+
+out:
+       if (ret != MESSAGEPORT_ERROR_NONE)
+               __free_delay_message_info(message);
+
+       return ret;
+}
+
+int __message_port_send_async(port_list_info_s *port_info, bundle *kb, const char *local_port,
                bool local_trusted, bool is_bidirection)
 {
        int ret = 0;
        int data_len;
        int local_port_len = 0;
-       unsigned int nb;
+       unsigned int nb = 0;
        bundle_raw *kb_data = NULL;
+       int sequence = SEQUENCE_START;
 
        bundle_encode(kb, &kb_data, &data_len);
        if (kb_data == NULL) {
                _LOGE("bundle encode fail");
-               ret = MESSAGEPORT_ERROR_IO_ERROR;
+               ret = MESSAGEPORT_ERROR_INVALID_PARAMETER;
                goto out;
        }
 
@@ -1442,33 +1773,52 @@ int __message_port_send_async(int sockfd, bundle *kb, const char *local_port,
                goto out;
        }
 
+       if (g_list_length(port_info->delayed_message_list) > 0) {
+               ret = MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE;
+               _LOGE("There are messages in the delayed_message_list (count %d)",
+                       g_list_length(port_info->delayed_message_list));
+               goto out;
+       }
+
        if (local_port != NULL)
                local_port_len = strlen(local_port) + 1;
 
-       if (__write_string_to_socket(sockfd, local_port, local_port_len) != MESSAGEPORT_ERROR_NONE) {
+       ret = __write_string_to_socket(port_info->send_sock_fd, local_port,
+                       local_port_len, &nb, &sequence);
+       if (ret != MESSAGEPORT_ERROR_NONE) {
                _LOGE("write local_port fail");
-               ret = MESSAGEPORT_ERROR_IO_ERROR;
                goto out;
        }
 
-       if (__write_socket(sockfd, (char *)&is_bidirection, sizeof(is_bidirection), &nb) != MESSAGEPORT_ERROR_NONE) {
+       ret = __write_socket(port_info->send_sock_fd, (char *)&is_bidirection,
+                       sizeof(is_bidirection), &nb, &sequence);
+       if (ret != MESSAGEPORT_ERROR_NONE) {
                _LOGE("write is_bidirection fail");
-               ret = MESSAGEPORT_ERROR_IO_ERROR;
                goto out;
        }
 
-       if (__write_socket(sockfd, (char *)&local_trusted, sizeof(local_trusted), &nb) != MESSAGEPORT_ERROR_NONE) {
+       ret = __write_socket(port_info->send_sock_fd, (char *)&local_trusted,
+                       sizeof(local_trusted), &nb, &sequence);
+       if (ret != MESSAGEPORT_ERROR_NONE) {
                _LOGE("write local_trusted fail");
-               ret = MESSAGEPORT_ERROR_IO_ERROR;
                goto out;
        }
 
-       if (__write_string_to_socket(sockfd, (void *)kb_data, data_len) != MESSAGEPORT_ERROR_NONE) {
+       ret = __write_string_to_socket(port_info->send_sock_fd, (void *)kb_data,
+                       data_len, &nb, &sequence);
+       if (ret != MESSAGEPORT_ERROR_NONE) {
                _LOGE("write kb_data fail");
-               ret = MESSAGEPORT_ERROR_IO_ERROR;
                goto out;
        }
+
 out:
+       if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) {
+               ret = __insert_delayed_message(port_info, sequence, kb_data, data_len, nb,
+                       local_port, local_trusted, is_bidirection);
+               if (ret != MESSAGEPORT_ERROR_NONE)
+                       ret = MESSAGEPORT_ERROR_IO_ERROR;
+       }
+
        if (kb_data)
                free(kb_data);
 
@@ -1512,7 +1862,7 @@ static int __message_port_send_message(const char *remote_appid, const char *rem
        }
 
        if (port_info->send_sock_fd > 0) {
-               ret = __message_port_send_async(port_info->send_sock_fd, message,
+               ret = __message_port_send_async(port_info, message,
                                (local_port) ? local_port : "", local_trusted, bi_dir);
        } else {
 
old mode 100644 (file)
new mode 100755 (executable)
index d6d50f3..2629c35
@@ -30,7 +30,7 @@ typedef struct message_port_callback_item_s {
 
 static GHashTable *__listeners;
 static GHashTable *__trusted_listeners;
-static pthread_mutex_t __mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 
 static void do_callback(message_port_message_cb callback, int local_port_id, const char *remote_app_id, const char *remote_port, bool trusted_remote_port, bundle *message, void *user_data)
 {
@@ -75,13 +75,13 @@ int message_port_register_local_port(const char *local_port, message_port_messag
                if (__listeners == NULL)
                        __listeners = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, g_free);
 
-               pthread_mutex_lock(&__mutex);
+               pthread_mutex_lock(&mutex);
                message_port_callback_item *item =
                        (message_port_callback_item *)g_hash_table_lookup(__listeners, GINT_TO_POINTER(local_port_id));
                if (item == NULL) {
                        item = (message_port_callback_item *)calloc(1, sizeof(message_port_callback_item));
                        if (item == NULL) {
-                               pthread_mutex_unlock(&__mutex);
+                               pthread_mutex_unlock(&mutex);
                                return MESSAGE_PORT_ERROR_OUT_OF_MEMORY;
                        }
 
@@ -90,7 +90,7 @@ int message_port_register_local_port(const char *local_port, message_port_messag
 
                item->callback = callback;
                item->user_data = user_data;
-               pthread_mutex_unlock(&__mutex);
+               pthread_mutex_unlock(&mutex);
 
        } else {
                _SECURE_LOGI("Register local port fail (%d).", local_port_id);
@@ -113,13 +113,13 @@ int message_port_register_trusted_local_port(const char *local_port, message_por
                if (__trusted_listeners == NULL)
                        __trusted_listeners = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, g_free);
 
-               pthread_mutex_lock(&__mutex);
+               pthread_mutex_lock(&mutex);
                message_port_callback_item *item =
                        (message_port_callback_item *)g_hash_table_lookup(__trusted_listeners, GINT_TO_POINTER(trusted_local_port_id));
                if (item == NULL) {
                        item = (message_port_callback_item *)calloc(1, sizeof(message_port_callback_item));
                        if (item == NULL) {
-                               pthread_mutex_unlock(&__mutex);
+                               pthread_mutex_unlock(&mutex);
                                return MESSAGE_PORT_ERROR_OUT_OF_MEMORY;
                        }
 
@@ -128,7 +128,7 @@ int message_port_register_trusted_local_port(const char *local_port, message_por
 
                item->callback = callback;
                item->user_data = user_data;
-               pthread_mutex_unlock(&__mutex);
+               pthread_mutex_unlock(&mutex);
        } else {
                _SECURE_LOGI("Register trusted local port fail (%d).", trusted_local_port_id);
        }
@@ -193,9 +193,9 @@ int message_port_send_message(const char *remote_app_id, const char *remote_port
        }
 
        _SECURE_LOGI("Send a message to (%s):(%s).", remote_app_id, remote_port);
-       pthread_mutex_lock(&__mutex);
+       pthread_mutex_lock(&mutex);
        ret = messageport_send_message(remote_app_id, remote_port, message);
-       pthread_mutex_unlock(&__mutex);
+       pthread_mutex_unlock(&mutex);
 
        return convert_to_tizen_error((messageport_error_e)ret);
 }
@@ -209,9 +209,9 @@ int message_port_send_trusted_message(const char *remote_app_id, const char *rem
        }
        _SECURE_LOGI("Send a trusted message to (%s):(%s).", remote_app_id, remote_port);
 
-       pthread_mutex_lock(&__mutex);
+       pthread_mutex_lock(&mutex);
        ret = messageport_send_trusted_message(remote_app_id, remote_port, message);
-       pthread_mutex_unlock(&__mutex);
+       pthread_mutex_unlock(&mutex);
 
        return convert_to_tizen_error((messageport_error_e)ret);
 }
@@ -244,9 +244,9 @@ int message_port_send_message_with_local_port(const char *remote_app_id, const c
        }
 
        _SECURE_LOGI("Send a message to (%s):(%s) and listen at the local port ID (%d).", remote_app_id, remote_port, local_port_id);
-       pthread_mutex_lock(&__mutex);
+       pthread_mutex_lock(&mutex);
        ret = messageport_send_bidirectional_message(local_port_id, remote_app_id, remote_port, message);
-       pthread_mutex_unlock(&__mutex);
+       pthread_mutex_unlock(&mutex);
 
        return convert_to_tizen_error((messageport_error_e)ret);
 }
@@ -277,9 +277,9 @@ int message_port_send_trusted_message_with_local_port(const char *remote_app_id,
        }
 
        _SECURE_LOGI("Send a trusted message to (%s):(%s) and listen at the local port ID (%d).", remote_app_id, remote_port, local_port_id);
-       pthread_mutex_lock(&__mutex);
+       pthread_mutex_lock(&mutex);
        ret = messageport_send_bidirectional_trusted_message(local_port_id, remote_app_id, remote_port, message);
-       pthread_mutex_unlock(&__mutex);
+       pthread_mutex_unlock(&mutex);
 
        return convert_to_tizen_error((messageport_error_e)ret);
 }