From 11275f7bdb407022a11a3febed5c1ddd3188c933 Mon Sep 17 00:00:00 2001 From: jusung son Date: Fri, 9 Jun 2017 13:28:55 +0900 Subject: [PATCH] Change the socket to non-blocking mode. - In blocking mode, the sender is blocked when the buffer of the receiver is full - Related patch [amd] https://review.tizen.org/gerrit/#/c/133124/ Change-Id: I503ae45db0dd04b9b267af7e08558b00386fa69d Signed-off-by: jusung son --- src/message-port.c | 388 ++++++++++++++++++++++++++++++++++++++++++++++++++--- src/message_port.c | 30 ++--- 2 files changed, 384 insertions(+), 34 deletions(-) mode change 100644 => 100755 src/message_port.c diff --git a/src/message-port.c b/src/message-port.c index 6868a5e..53a0f73 100755 --- a/src/message-port.c +++ b/src/message-port.c @@ -34,6 +34,9 @@ #include #include #include +#include +#include +#include #include "message-port.h" #include "message-port-log.h" @@ -135,6 +138,9 @@ typedef struct port_list_info { bool exist; GIOChannel *gio_read; int g_src_id; + GList *delayed_message_list; + unsigned int delayed_message_size; + int delay_src_id; } port_list_info_s; typedef struct registered_callback_info { @@ -147,6 +153,34 @@ typedef struct registered_callback_info { messageport_registration_event_cb unregistered_cb; } registered_callback_info_s; +enum transmission_sequence { + SEQUENCE_START = 0, + SEQUENCE_PORT_LEN, + SEQUENCE_PORT_NAME, + SEQUENCE_BIDIRECTION, + SEQUENCE_TRUSTED, + SEQUENCE_DTAT_LEN, + SEQUENCE_DATA, + SEQUENCE_END +}; + +typedef struct delay_message { + unsigned int size; + unsigned int sent_bytes; + int sequence; + int local_port_len; + char *local_port_name; + bool is_bidirection; + bool local_trusted; + int data_len; + bundle_raw *data; +} delay_message_info_s; + + +extern pthread_mutex_t mutex; +static void __free_list_delay_message_info(gpointer data); + + static void __callback_info_free(gpointer data) { message_port_callback_info_s *callback_info = (message_port_callback_info_s *)data; @@ -466,6 +500,19 @@ static void __clear_disconnect_socket(port_list_info_s *port_info) g_source_remove(port_info->g_src_id); port_info->g_src_id = 0; } + + if (port_info->delay_src_id != 0) { + g_source_remove(port_info->delay_src_id); + port_info->delay_src_id = 0; + } + + if (port_info->delayed_message_list != NULL) { + g_list_free_full(port_info->delayed_message_list, __free_list_delay_message_info); + /* can be reused */ + port_info->delayed_message_list = NULL; + } + + port_info->delayed_message_size = 0; port_info->send_sock_fd = 0; } @@ -604,13 +651,30 @@ out: static int __write_socket(int fd, const char *buffer, unsigned int nbytes, - unsigned int *bytes_write) + unsigned int *bytes_write, + int *sequence) { +#define SEND_TIMEOUT 500 /* milliseconds */ + unsigned int left = nbytes; ssize_t nb; int retry_cnt = 0; + struct pollfd fds[1]; + int ret; + *sequence += 1; *bytes_write = 0; + + fds[0].fd = fd; + fds[0].events = POLLOUT; + fds[0].revents = 0; + + ret = poll(fds, 1, SEND_TIMEOUT); + if (ret == 0) { + LOGE("__write_socket: : fd %d poll timeout", fd); + return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE; + } + while (left && (retry_cnt < MAX_RETRY_CNT)) { nb = write(fd, buffer, left); if (nb == -1) { @@ -620,6 +684,10 @@ static int __write_socket(int fd, continue; } LOGE("__write_socket: ...error fd %d: errno %d\n", fd, errno); + + if (errno == EWOULDBLOCK || errno == EAGAIN) + return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE; + return MESSAGEPORT_ERROR_IO_ERROR; } @@ -631,20 +699,31 @@ static int __write_socket(int fd, return MESSAGEPORT_ERROR_NONE; } -static int __write_string_to_socket(int fd, const char *buffer, int string_len) +static int __write_string_to_socket(int fd, + const char *buffer, + int string_len, + unsigned int *bytes_write, + int *sequence) { - unsigned int nb; - if (__write_socket(fd, (char *)&string_len, sizeof(string_len), &nb) != MESSAGEPORT_ERROR_NONE) { + int ret; + + ret = __write_socket(fd, (char *)&string_len, sizeof(string_len), + bytes_write, sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { _LOGE("write string_len fail"); - return MESSAGEPORT_ERROR_IO_ERROR; + return ret; } if (string_len > 0) { - if (__write_socket(fd, buffer, string_len, &nb) != MESSAGEPORT_ERROR_NONE) { + ret = __write_socket(fd, buffer, string_len, bytes_write, sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { _LOGE("wirte buffer fail"); - return MESSAGEPORT_ERROR_IO_ERROR; + return ret; } + } else { + *sequence += 1; } + return MESSAGEPORT_ERROR_NONE; } @@ -1420,19 +1499,271 @@ static int __register_message_port(const char *local_port, bool is_trusted, mess return local_id; } -int __message_port_send_async(int sockfd, bundle *kb, const char *local_port, +static void __free_delay_message_info(delay_message_info_s *message) +{ + if (message != NULL) { + FREE_AND_NULL(message->local_port_name); + FREE_AND_NULL(message->data); + FREE_AND_NULL(message); + } +} + +static void __free_list_delay_message_info(gpointer data) +{ + delay_message_info_s *message = (delay_message_info_s *)data; + + if (message != NULL) + __free_delay_message_info(message); +} + +static int __send_delayed_message(int sockfd, delay_message_info_s *message) +{ + unsigned int nb = 0; + int sequence = message->sequence - 1; + int ret = MESSAGEPORT_ERROR_NONE; + bool is_startline = true; + int offset = 0; + + _LOGI("send_delayed_message : sockfd (%d) sequence(%d) sent byte(%d)", + sockfd, message->sequence, message->sent_bytes); + + switch (message->sequence) { + case SEQUENCE_START: + sequence++; + is_startline = false; + + case SEQUENCE_PORT_LEN: + if (is_startline) + offset = message->sent_bytes; + + ret = __write_socket(sockfd, ((char *)&message->local_port_len) + offset, + sizeof(message->local_port_len) - offset, &nb, &sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { + _LOGE("write local_port_len fail"); + goto out; + } + offset = 0; + is_startline = false; + + case SEQUENCE_PORT_NAME: + if (is_startline) + offset = message->sent_bytes; + + if (message->local_port_len > 0) + ret = __write_socket(sockfd, message->local_port_name + offset, + message->local_port_len - offset , &nb, &sequence); + else + sequence++; + + if (ret != MESSAGEPORT_ERROR_NONE) { + _LOGE("write local_port fail"); + goto out; + } + offset = 0; + is_startline = false; + + case SEQUENCE_BIDIRECTION: + if (is_startline) + offset = message->sent_bytes; + + ret = __write_socket(sockfd, ((char *)&message->is_bidirection) + offset, + sizeof(message->is_bidirection) - offset, &nb, &sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { + _LOGE("write is_bidirection fail"); + goto out; + } + offset = 0; + is_startline = false; + + case SEQUENCE_TRUSTED: + if (is_startline) + offset = message->sent_bytes; + + ret = __write_socket(sockfd, ((char *)&message->local_trusted) + offset, + sizeof(message->local_trusted) - offset, &nb, &sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { + _LOGE("write local_trusted fail"); + goto out; + } + offset = 0; + is_startline = false; + + case SEQUENCE_DTAT_LEN: + if (is_startline) + offset = message->sent_bytes; + + ret = __write_socket(sockfd, ((char *)&message->data_len) + offset, + sizeof(message->data_len) - offset, &nb, &sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { + _LOGE("write data_len fail"); + goto out; + } + offset = 0; + is_startline = false; + + case SEQUENCE_DATA: + if (is_startline) + offset = message->sent_bytes; + + ret = __write_socket(sockfd, (char *)message->data + offset, + message->data_len -offset, &nb, &sequence); + + if (ret != MESSAGEPORT_ERROR_NONE) { + _LOGE("write data fail"); + goto out; + } + offset = 0; + is_startline = false; + + default: + ret = MESSAGEPORT_ERROR_NONE; + + } + +out: + if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) { + if (is_startline) + message->sent_bytes += nb; + else + message->sent_bytes = nb; + + message->sequence = sequence; + _LOGE("send_delayed_message fail : sockfd (%d) sequence(%d) sent byte(%d)", + sockfd, message->sequence, message->sent_bytes); + } + + return ret; + +} + +static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer data) +{ + port_list_info_s *port_info = (port_list_info_s *)data; + delay_message_info_s *message; + int ret; + + if (port_info == NULL) + return G_SOURCE_REMOVE; + + pthread_mutex_lock(&mutex); + + if (port_info->delayed_message_list == NULL) { + port_info->delayed_message_size = 0; + port_info->delay_src_id = 0; + pthread_mutex_unlock(&mutex); + return G_SOURCE_REMOVE; + } else { + message = g_list_nth_data(port_info->delayed_message_list, 0); + ret = __send_delayed_message(port_info->send_sock_fd, message); + + if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) { + pthread_mutex_unlock(&mutex); + return G_SOURCE_CONTINUE; + } else if (ret == MESSAGEPORT_ERROR_IO_ERROR) { + __clear_disconnect_socket(port_info); + pthread_mutex_unlock(&mutex); + return G_SOURCE_REMOVE; + } + + port_info->delayed_message_size -= message->size; + + port_info->delayed_message_list = g_list_remove(port_info->delayed_message_list, message); + __free_delay_message_info(message); + } + + pthread_mutex_unlock(&mutex); + + return G_SOURCE_CONTINUE; +} + +static int __insert_delayed_message(port_list_info_s *port_info, + int sequence, + bundle_raw *kb_data, + int data_len, + unsigned int sent_bytes, + const char *local_port, + bool local_trusted, + bool is_bidirection) +{ +#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB per remote port (MAX) */ + + unsigned int tmp_size; + unsigned int message_size; + int ret = MESSAGEPORT_ERROR_NONE; + + if (port_info->delayed_message_size >= QUEUE_SIZE_MAX) { + _LOGE("cache fail : delayed_message_size (%d), count(%d)", + port_info->delayed_message_size, g_list_length(port_info->delayed_message_list)); + return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE; + } + + delay_message_info_s *message = (delay_message_info_s *)calloc(1, sizeof(delay_message_info_s)); + retvm_if(!message, MESSAGEPORT_ERROR_OUT_OF_MEMORY, "Malloc failed"); + + message_size = sizeof(delay_message_info_s); + + message->sequence = sequence; + tmp_size = strlen(local_port) + 1; + message_size += tmp_size; + message->local_port_len = tmp_size; + message->local_port_name = strdup(local_port); + if (message->local_port_name == NULL) { + _LOGE("local_port_name strdup fail"); + ret = MESSAGEPORT_ERROR_OUT_OF_MEMORY; + goto out; + } + message->is_bidirection = is_bidirection; + message->local_trusted = local_trusted; + message_size += data_len; + message->data_len = data_len; + message->data = (bundle_raw *)strdup((const char *)kb_data); + if (message->data == NULL) { + _LOGE("data strdup fail"); + ret = MESSAGEPORT_ERROR_OUT_OF_MEMORY; + goto out; + } + + + message->sent_bytes = sent_bytes; + message->size = message_size; + port_info->delayed_message_size += message_size; + + port_info->delayed_message_list = g_list_append(port_info->delayed_message_list, message); + + if (port_info->delay_src_id == 0) { + port_info->delay_src_id = g_unix_fd_add_full(G_PRIORITY_DEFAULT, + port_info->send_sock_fd, G_IO_OUT, __process_delayed_message, + port_info, NULL); + } + + _LOGE("inserted : pm(%s) fd(%d) ms(%d) ds(%d) dlc(%d) sqn(%d) sb (%d)", + port_info->port_name, port_info->send_sock_fd, message_size, + port_info->delayed_message_size, + g_list_length(port_info->delayed_message_list), sequence, sent_bytes); + + + +out: + if (ret != MESSAGEPORT_ERROR_NONE) + __free_delay_message_info(message); + + return ret; +} + +int __message_port_send_async(port_list_info_s *port_info, bundle *kb, const char *local_port, bool local_trusted, bool is_bidirection) { int ret = 0; int data_len; int local_port_len = 0; - unsigned int nb; + unsigned int nb = 0; bundle_raw *kb_data = NULL; + int sequence = SEQUENCE_START; bundle_encode(kb, &kb_data, &data_len); if (kb_data == NULL) { _LOGE("bundle encode fail"); - ret = MESSAGEPORT_ERROR_IO_ERROR; + ret = MESSAGEPORT_ERROR_INVALID_PARAMETER; goto out; } @@ -1442,33 +1773,52 @@ int __message_port_send_async(int sockfd, bundle *kb, const char *local_port, goto out; } + if (g_list_length(port_info->delayed_message_list) > 0) { + ret = MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE; + _LOGE("There are messages in the delayed_message_list (count %d)", + g_list_length(port_info->delayed_message_list)); + goto out; + } + if (local_port != NULL) local_port_len = strlen(local_port) + 1; - if (__write_string_to_socket(sockfd, local_port, local_port_len) != MESSAGEPORT_ERROR_NONE) { + ret = __write_string_to_socket(port_info->send_sock_fd, local_port, + local_port_len, &nb, &sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { _LOGE("write local_port fail"); - ret = MESSAGEPORT_ERROR_IO_ERROR; goto out; } - if (__write_socket(sockfd, (char *)&is_bidirection, sizeof(is_bidirection), &nb) != MESSAGEPORT_ERROR_NONE) { + ret = __write_socket(port_info->send_sock_fd, (char *)&is_bidirection, + sizeof(is_bidirection), &nb, &sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { _LOGE("write is_bidirection fail"); - ret = MESSAGEPORT_ERROR_IO_ERROR; goto out; } - if (__write_socket(sockfd, (char *)&local_trusted, sizeof(local_trusted), &nb) != MESSAGEPORT_ERROR_NONE) { + ret = __write_socket(port_info->send_sock_fd, (char *)&local_trusted, + sizeof(local_trusted), &nb, &sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { _LOGE("write local_trusted fail"); - ret = MESSAGEPORT_ERROR_IO_ERROR; goto out; } - if (__write_string_to_socket(sockfd, (void *)kb_data, data_len) != MESSAGEPORT_ERROR_NONE) { + ret = __write_string_to_socket(port_info->send_sock_fd, (void *)kb_data, + data_len, &nb, &sequence); + if (ret != MESSAGEPORT_ERROR_NONE) { _LOGE("write kb_data fail"); - ret = MESSAGEPORT_ERROR_IO_ERROR; goto out; } + out: + if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) { + ret = __insert_delayed_message(port_info, sequence, kb_data, data_len, nb, + local_port, local_trusted, is_bidirection); + if (ret != MESSAGEPORT_ERROR_NONE) + ret = MESSAGEPORT_ERROR_IO_ERROR; + } + if (kb_data) free(kb_data); @@ -1512,7 +1862,7 @@ static int __message_port_send_message(const char *remote_appid, const char *rem } if (port_info->send_sock_fd > 0) { - ret = __message_port_send_async(port_info->send_sock_fd, message, + ret = __message_port_send_async(port_info, message, (local_port) ? local_port : "", local_trusted, bi_dir); } else { diff --git a/src/message_port.c b/src/message_port.c old mode 100644 new mode 100755 index d6d50f3..2629c35 --- a/src/message_port.c +++ b/src/message_port.c @@ -30,7 +30,7 @@ typedef struct message_port_callback_item_s { static GHashTable *__listeners; static GHashTable *__trusted_listeners; -static pthread_mutex_t __mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static void do_callback(message_port_message_cb callback, int local_port_id, const char *remote_app_id, const char *remote_port, bool trusted_remote_port, bundle *message, void *user_data) { @@ -75,13 +75,13 @@ int message_port_register_local_port(const char *local_port, message_port_messag if (__listeners == NULL) __listeners = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, g_free); - pthread_mutex_lock(&__mutex); + pthread_mutex_lock(&mutex); message_port_callback_item *item = (message_port_callback_item *)g_hash_table_lookup(__listeners, GINT_TO_POINTER(local_port_id)); if (item == NULL) { item = (message_port_callback_item *)calloc(1, sizeof(message_port_callback_item)); if (item == NULL) { - pthread_mutex_unlock(&__mutex); + pthread_mutex_unlock(&mutex); return MESSAGE_PORT_ERROR_OUT_OF_MEMORY; } @@ -90,7 +90,7 @@ int message_port_register_local_port(const char *local_port, message_port_messag item->callback = callback; item->user_data = user_data; - pthread_mutex_unlock(&__mutex); + pthread_mutex_unlock(&mutex); } else { _SECURE_LOGI("Register local port fail (%d).", local_port_id); @@ -113,13 +113,13 @@ int message_port_register_trusted_local_port(const char *local_port, message_por if (__trusted_listeners == NULL) __trusted_listeners = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, g_free); - pthread_mutex_lock(&__mutex); + pthread_mutex_lock(&mutex); message_port_callback_item *item = (message_port_callback_item *)g_hash_table_lookup(__trusted_listeners, GINT_TO_POINTER(trusted_local_port_id)); if (item == NULL) { item = (message_port_callback_item *)calloc(1, sizeof(message_port_callback_item)); if (item == NULL) { - pthread_mutex_unlock(&__mutex); + pthread_mutex_unlock(&mutex); return MESSAGE_PORT_ERROR_OUT_OF_MEMORY; } @@ -128,7 +128,7 @@ int message_port_register_trusted_local_port(const char *local_port, message_por item->callback = callback; item->user_data = user_data; - pthread_mutex_unlock(&__mutex); + pthread_mutex_unlock(&mutex); } else { _SECURE_LOGI("Register trusted local port fail (%d).", trusted_local_port_id); } @@ -193,9 +193,9 @@ int message_port_send_message(const char *remote_app_id, const char *remote_port } _SECURE_LOGI("Send a message to (%s):(%s).", remote_app_id, remote_port); - pthread_mutex_lock(&__mutex); + pthread_mutex_lock(&mutex); ret = messageport_send_message(remote_app_id, remote_port, message); - pthread_mutex_unlock(&__mutex); + pthread_mutex_unlock(&mutex); return convert_to_tizen_error((messageport_error_e)ret); } @@ -209,9 +209,9 @@ int message_port_send_trusted_message(const char *remote_app_id, const char *rem } _SECURE_LOGI("Send a trusted message to (%s):(%s).", remote_app_id, remote_port); - pthread_mutex_lock(&__mutex); + pthread_mutex_lock(&mutex); ret = messageport_send_trusted_message(remote_app_id, remote_port, message); - pthread_mutex_unlock(&__mutex); + pthread_mutex_unlock(&mutex); return convert_to_tizen_error((messageport_error_e)ret); } @@ -244,9 +244,9 @@ int message_port_send_message_with_local_port(const char *remote_app_id, const c } _SECURE_LOGI("Send a message to (%s):(%s) and listen at the local port ID (%d).", remote_app_id, remote_port, local_port_id); - pthread_mutex_lock(&__mutex); + pthread_mutex_lock(&mutex); ret = messageport_send_bidirectional_message(local_port_id, remote_app_id, remote_port, message); - pthread_mutex_unlock(&__mutex); + pthread_mutex_unlock(&mutex); return convert_to_tizen_error((messageport_error_e)ret); } @@ -277,9 +277,9 @@ int message_port_send_trusted_message_with_local_port(const char *remote_app_id, } _SECURE_LOGI("Send a trusted message to (%s):(%s) and listen at the local port ID (%d).", remote_app_id, remote_port, local_port_id); - pthread_mutex_lock(&__mutex); + pthread_mutex_lock(&mutex); ret = messageport_send_bidirectional_trusted_message(local_port_id, remote_app_id, remote_port, message); - pthread_mutex_unlock(&__mutex); + pthread_mutex_unlock(&mutex); return convert_to_tizen_error((messageport_error_e)ret); } -- 2.7.4