#include <aul.h>
#include <gio/gio.h>
#include <gio/gunixfdlist.h>
+#include <pthread.h>
+#include <glib-unix.h>
+#include <poll.h>
#include "message-port.h"
#include "message-port-log.h"
bool exist;
GIOChannel *gio_read;
int g_src_id;
+ GList *delayed_message_list;
+ unsigned int delayed_message_size;
+ int delay_src_id;
} port_list_info_s;
+enum transmission_sequence {
+ SEQUENCE_START = 0,
+ SEQUENCE_PORT_LEN,
+ SEQUENCE_PORT_NAME,
+ SEQUENCE_BIDIRECTION,
+ SEQUENCE_TRUSTED,
+ SEQUENCE_DTAT_LEN,
+ SEQUENCE_DATA,
+ SEQUENCE_END
+};
+
+typedef struct delay_message {
+ unsigned int size;
+ unsigned int sent_bytes;
+ int sequence;
+ int local_port_len;
+ char *local_port_name;
+ bool is_bidirection;
+ bool local_trusted;
+ int data_len;
+ bundle_raw *data;
+} delay_message_info_s;
+
+
+extern pthread_mutex_t mutex;
+static void __free_list_delay_message_info(gpointer data);
+
+
static void __callback_info_free(gpointer data)
{
message_port_callback_info_s *callback_info = (message_port_callback_info_s *)data;
g_source_remove(port_info->g_src_id);
port_info->g_src_id = 0;
}
+
+ if (port_info->delay_src_id != 0) {
+ g_source_remove(port_info->delay_src_id);
+ port_info->delay_src_id = 0;
+ }
+
+ if (port_info->delayed_message_list != NULL) {
+ g_list_free_full(port_info->delayed_message_list, __free_list_delay_message_info);
+ /* can be reused */
+ port_info->delayed_message_list = NULL;
+ }
+
+ port_info->delayed_message_size = 0;
port_info->send_sock_fd = 0;
}
static int __write_socket(int fd,
const char *buffer,
unsigned int nbytes,
- unsigned int *bytes_write)
+ unsigned int *bytes_write,
+ int *sequence)
{
+#define SEND_TIMEOUT 500 /* milliseconds */
+
unsigned int left = nbytes;
ssize_t nb;
- int retry_cnt = 0;
+ struct pollfd fds[1];
+ int ret;
+ *sequence += 1;
*bytes_write = 0;
- while (left && (retry_cnt < MAX_RETRY_CNT)) {
+
+ fds[0].fd = fd;
+ fds[0].events = POLLOUT;
+ fds[0].revents = 0;
+
+ ret = poll(fds, 1, SEND_TIMEOUT);
+ if (ret == 0) {
+ LOGE("__write_socket: : fd %d poll timeout", fd);
+ return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE;
+ }
+
+ while (left) {
nb = write(fd, buffer, left);
if (nb == -1) {
- if (errno == EINTR) {
- LOGE("__write_socket: EINTR error continue ...");
- retry_cnt++;
- continue;
- }
LOGE("__write_socket: ...error fd %d: errno %d\n", fd, errno);
+
+ if (errno == EWOULDBLOCK || errno == EAGAIN)
+ return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE;
+
return MESSAGEPORT_ERROR_IO_ERROR;
}
left -= nb;
buffer += nb;
*bytes_write += nb;
- retry_cnt = 0;
}
return MESSAGEPORT_ERROR_NONE;
}
-static int __write_string_to_socket(int fd, const char *buffer, int string_len)
+static int __write_string_to_socket(int fd,
+ const char *buffer,
+ int string_len,
+ unsigned int *bytes_write,
+ int *sequence)
{
- unsigned int nb;
- if (__write_socket(fd, (char *)&string_len, sizeof(string_len), &nb) != MESSAGEPORT_ERROR_NONE) {
+ int ret;
+
+ ret = __write_socket(fd, (char *)&string_len, sizeof(string_len),
+ bytes_write, sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
_LOGE("write string_len fail");
- return MESSAGEPORT_ERROR_IO_ERROR;
+ return ret;
}
if (string_len > 0) {
- if (__write_socket(fd, buffer, string_len, &nb) != MESSAGEPORT_ERROR_NONE) {
+ ret = __write_socket(fd, buffer, string_len, bytes_write, sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
_LOGE("wirte buffer fail");
- return MESSAGEPORT_ERROR_IO_ERROR;
+ return ret;
}
+ } else {
+ *sequence += 1;
}
+
return MESSAGEPORT_ERROR_NONE;
}
return MESSAGEPORT_ERROR_IO_ERROR;
} else if (nb == -1) {
/* wrt(nodejs) could change socket to none-blocking socket :-( */
- if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
- LOGE("__read_socket: %d errno, sleep and retry ...", errno);
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ LOGI("__read_socket: %d , sleep and retry ...", errno);
retry_cnt++;
nanosleep(&TRY_SLEEP_TIME, 0);
continue;
*bytes_read += nb;
retry_cnt = 0;
}
+
+ if (left != 0) {
+ LOGE("error fd %d: retry_cnt %d", fd, retry_cnt);
+ return MESSAGEPORT_ERROR_IO_ERROR;
+ }
+
return MESSAGEPORT_ERROR_NONE;
}
return local_id;
}
-int __message_port_send_async(int sockfd, bundle *kb, const char *local_port,
+static void __free_delay_message_info(delay_message_info_s *message)
+{
+ if (message != NULL) {
+ FREE_AND_NULL(message->local_port_name);
+ FREE_AND_NULL(message->data);
+ FREE_AND_NULL(message);
+ }
+}
+
+static void __free_list_delay_message_info(gpointer data)
+{
+ delay_message_info_s *message = (delay_message_info_s *)data;
+
+ if (message != NULL)
+ __free_delay_message_info(message);
+}
+
+static int __send_delayed_message(int sockfd, delay_message_info_s *message)
+{
+ unsigned int nb = 0;
+ int sequence = message->sequence - 1;
+ int ret = MESSAGEPORT_ERROR_NONE;
+ bool is_startline = true;
+ int offset = 0;
+
+ _LOGI("send_delayed_message : sockfd (%d) sequence(%d) sent byte(%d)",
+ sockfd, message->sequence, message->sent_bytes);
+
+ switch (message->sequence) {
+ case SEQUENCE_START:
+ sequence++;
+ is_startline = false;
+
+ case SEQUENCE_PORT_LEN:
+ if (is_startline)
+ offset = message->sent_bytes;
+
+ ret = __write_socket(sockfd, ((char *)&message->local_port_len) + offset,
+ sizeof(message->local_port_len) - offset, &nb, &sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
+ _LOGE("write local_port_len fail");
+ goto out;
+ }
+ offset = 0;
+ is_startline = false;
+
+ case SEQUENCE_PORT_NAME:
+ if (is_startline)
+ offset = message->sent_bytes;
+
+ if (message->local_port_len > 0)
+ ret = __write_socket(sockfd, message->local_port_name + offset,
+ message->local_port_len - offset , &nb, &sequence);
+ else
+ sequence++;
+
+ if (ret != MESSAGEPORT_ERROR_NONE) {
+ _LOGE("write local_port fail");
+ goto out;
+ }
+ offset = 0;
+ is_startline = false;
+
+ case SEQUENCE_BIDIRECTION:
+ if (is_startline)
+ offset = message->sent_bytes;
+
+ ret = __write_socket(sockfd, ((char *)&message->is_bidirection) + offset,
+ sizeof(message->is_bidirection) - offset, &nb, &sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
+ _LOGE("write is_bidirection fail");
+ goto out;
+ }
+ offset = 0;
+ is_startline = false;
+
+ case SEQUENCE_TRUSTED:
+ if (is_startline)
+ offset = message->sent_bytes;
+
+ ret = __write_socket(sockfd, ((char *)&message->local_trusted) + offset,
+ sizeof(message->local_trusted) - offset, &nb, &sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
+ _LOGE("write local_trusted fail");
+ goto out;
+ }
+ offset = 0;
+ is_startline = false;
+
+ case SEQUENCE_DTAT_LEN:
+ if (is_startline)
+ offset = message->sent_bytes;
+
+ ret = __write_socket(sockfd, ((char *)&message->data_len) + offset,
+ sizeof(message->data_len) - offset, &nb, &sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
+ _LOGE("write data_len fail");
+ goto out;
+ }
+ offset = 0;
+ is_startline = false;
+
+ case SEQUENCE_DATA:
+ if (is_startline)
+ offset = message->sent_bytes;
+
+ ret = __write_socket(sockfd, (char *)message->data + offset,
+ message->data_len -offset, &nb, &sequence);
+
+ if (ret != MESSAGEPORT_ERROR_NONE) {
+ _LOGE("write data fail");
+ goto out;
+ }
+ offset = 0;
+ is_startline = false;
+
+ default:
+ ret = MESSAGEPORT_ERROR_NONE;
+
+ }
+
+out:
+ if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) {
+ if (is_startline)
+ message->sent_bytes += nb;
+ else
+ message->sent_bytes = nb;
+
+ message->sequence = sequence;
+ _LOGE("send_delayed_message fail : sockfd (%d) sequence(%d) sent byte(%d)",
+ sockfd, message->sequence, message->sent_bytes);
+ }
+
+ return ret;
+
+}
+
+static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer data)
+{
+ port_list_info_s *port_info = (port_list_info_s *)data;
+ delay_message_info_s *message;
+ int ret;
+
+ if (port_info == NULL)
+ return G_SOURCE_REMOVE;
+
+ pthread_mutex_lock(&mutex);
+
+ if (port_info->delayed_message_list == NULL) {
+ port_info->delayed_message_size = 0;
+ port_info->delay_src_id = 0;
+ pthread_mutex_unlock(&mutex);
+ return G_SOURCE_REMOVE;
+ } else {
+ message = g_list_nth_data(port_info->delayed_message_list, 0);
+ ret = __send_delayed_message(port_info->send_sock_fd, message);
+
+ if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) {
+ pthread_mutex_unlock(&mutex);
+ return G_SOURCE_CONTINUE;
+ } else if (ret == MESSAGEPORT_ERROR_IO_ERROR) {
+ __clear_disconnect_socket(port_info);
+ pthread_mutex_unlock(&mutex);
+ return G_SOURCE_REMOVE;
+ }
+
+ port_info->delayed_message_size -= message->size;
+
+ port_info->delayed_message_list = g_list_remove(port_info->delayed_message_list, message);
+ __free_delay_message_info(message);
+ }
+
+ pthread_mutex_unlock(&mutex);
+
+ return G_SOURCE_CONTINUE;
+}
+
+static int __insert_delayed_message(port_list_info_s *port_info,
+ int sequence,
+ bundle_raw *kb_data,
+ int data_len,
+ unsigned int sent_bytes,
+ const char *local_port,
+ bool local_trusted,
+ bool is_bidirection)
+{
+#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB per remote port (MAX) */
+
+ unsigned int tmp_size;
+ unsigned int message_size;
+ int ret = MESSAGEPORT_ERROR_NONE;
+
+ if (port_info->delayed_message_size >= QUEUE_SIZE_MAX) {
+ _LOGE("cache fail : delayed_message_size (%d), count(%d)",
+ port_info->delayed_message_size, g_list_length(port_info->delayed_message_list));
+ return MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE;
+ }
+
+ delay_message_info_s *message = (delay_message_info_s *)calloc(1, sizeof(delay_message_info_s));
+ retvm_if(!message, MESSAGEPORT_ERROR_OUT_OF_MEMORY, "Malloc failed");
+
+ message_size = sizeof(delay_message_info_s);
+
+ message->sequence = sequence;
+ tmp_size = strlen(local_port) + 1;
+ message_size += tmp_size;
+ message->local_port_len = tmp_size;
+ message->local_port_name = strdup(local_port);
+ if (message->local_port_name == NULL) {
+ _LOGE("local_port_name strdup fail");
+ ret = MESSAGEPORT_ERROR_OUT_OF_MEMORY;
+ goto out;
+ }
+ message->is_bidirection = is_bidirection;
+ message->local_trusted = local_trusted;
+ message_size += data_len;
+ message->data_len = data_len;
+ message->data = (bundle_raw *)strdup((const char *)kb_data);
+ if (message->data == NULL) {
+ _LOGE("data strdup fail");
+ ret = MESSAGEPORT_ERROR_OUT_OF_MEMORY;
+ goto out;
+ }
+
+
+ message->sent_bytes = sent_bytes;
+ message->size = message_size;
+ port_info->delayed_message_size += message_size;
+
+ port_info->delayed_message_list = g_list_append(port_info->delayed_message_list, message);
+
+ if (port_info->delay_src_id == 0) {
+ port_info->delay_src_id = g_unix_fd_add_full(G_PRIORITY_DEFAULT,
+ port_info->send_sock_fd, G_IO_OUT, __process_delayed_message,
+ port_info, NULL);
+ }
+
+ _LOGE("inserted : pm(%s) fd(%d) ms(%d) ds(%d) dlc(%d) sqn(%d) sb (%d)",
+ port_info->port_name, port_info->send_sock_fd, message_size,
+ port_info->delayed_message_size,
+ g_list_length(port_info->delayed_message_list), sequence, sent_bytes);
+
+
+
+out:
+ if (ret != MESSAGEPORT_ERROR_NONE)
+ __free_delay_message_info(message);
+
+ return ret;
+}
+
+int __message_port_send_async(port_list_info_s *port_info, bundle *kb, const char *local_port,
bool local_trusted, bool is_bidirection)
{
int ret = 0;
int data_len;
int local_port_len = 0;
- unsigned int nb;
+ unsigned int nb = 0;
bundle_raw *kb_data = NULL;
+ int sequence = SEQUENCE_START;
bundle_encode(kb, &kb_data, &data_len);
if (kb_data == NULL) {
_LOGE("bundle encode fail");
- ret = MESSAGEPORT_ERROR_IO_ERROR;
+ ret = MESSAGEPORT_ERROR_INVALID_PARAMETER;
goto out;
}
goto out;
}
+ if (g_list_length(port_info->delayed_message_list) > 0) {
+ ret = MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE;
+ _LOGE("There are messages in the delayed_message_list (count %d)",
+ g_list_length(port_info->delayed_message_list));
+ goto out;
+ }
+
if (local_port != NULL)
local_port_len = strlen(local_port) + 1;
- if (__write_string_to_socket(sockfd, local_port, local_port_len) != MESSAGEPORT_ERROR_NONE) {
+ ret = __write_string_to_socket(port_info->send_sock_fd, local_port,
+ local_port_len, &nb, &sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
_LOGE("write local_port fail");
- ret = MESSAGEPORT_ERROR_IO_ERROR;
goto out;
}
- if (__write_socket(sockfd, (char *)&is_bidirection, sizeof(is_bidirection), &nb) != MESSAGEPORT_ERROR_NONE) {
+ ret = __write_socket(port_info->send_sock_fd, (char *)&is_bidirection,
+ sizeof(is_bidirection), &nb, &sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
_LOGE("write is_bidirection fail");
- ret = MESSAGEPORT_ERROR_IO_ERROR;
goto out;
}
- if (__write_socket(sockfd, (char *)&local_trusted, sizeof(local_trusted), &nb) != MESSAGEPORT_ERROR_NONE) {
+ ret = __write_socket(port_info->send_sock_fd, (char *)&local_trusted,
+ sizeof(local_trusted), &nb, &sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
_LOGE("write local_trusted fail");
- ret = MESSAGEPORT_ERROR_IO_ERROR;
goto out;
}
- if (__write_string_to_socket(sockfd, (void *)kb_data, data_len) != MESSAGEPORT_ERROR_NONE) {
+ ret = __write_string_to_socket(port_info->send_sock_fd, (void *)kb_data,
+ data_len, &nb, &sequence);
+ if (ret != MESSAGEPORT_ERROR_NONE) {
_LOGE("write kb_data fail");
- ret = MESSAGEPORT_ERROR_IO_ERROR;
goto out;
}
+
out:
+ if (ret == MESSAGEPORT_ERROR_RESOURCE_UNAVAILABLE) {
+ ret = __insert_delayed_message(port_info, sequence, kb_data, data_len, nb,
+ local_port, local_trusted, is_bidirection);
+ if (ret != MESSAGEPORT_ERROR_NONE)
+ ret = MESSAGEPORT_ERROR_IO_ERROR;
+ }
+
if (kb_data)
free(kb_data);
}
if (port_info->send_sock_fd > 0) {
- ret = __message_port_send_async(port_info->send_sock_fd, message,
+ ret = __message_port_send_async(port_info, message,
(local_port) ? local_port : "", local_trusted, bi_dir);
} else {
static GHashTable *__listeners;
static GHashTable *__trusted_listeners;
-static pthread_mutex_t __mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static void do_callback(message_port_message_cb callback, int local_port_id, const char *remote_app_id, const char *remote_port, bool trusted_remote_port, bundle *message, void *user_data)
{
if (__listeners == NULL)
__listeners = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, g_free);
- pthread_mutex_lock(&__mutex);
+ pthread_mutex_lock(&mutex);
message_port_callback_item *item =
(message_port_callback_item *)g_hash_table_lookup(__listeners, GINT_TO_POINTER(local_port_id));
if (item == NULL) {
item = (message_port_callback_item *)calloc(1, sizeof(message_port_callback_item));
if (item == NULL) {
- pthread_mutex_unlock(&__mutex);
+ pthread_mutex_unlock(&mutex);
return MESSAGE_PORT_ERROR_OUT_OF_MEMORY;
}
item->callback = callback;
item->user_data = user_data;
- pthread_mutex_unlock(&__mutex);
+ pthread_mutex_unlock(&mutex);
} else {
_SECURE_LOGI("Register local port fail (%d).", local_port_id);
if (__trusted_listeners == NULL)
__trusted_listeners = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, g_free);
- pthread_mutex_lock(&__mutex);
+ pthread_mutex_lock(&mutex);
message_port_callback_item *item =
(message_port_callback_item *)g_hash_table_lookup(__trusted_listeners, GINT_TO_POINTER(trusted_local_port_id));
if (item == NULL) {
item = (message_port_callback_item *)calloc(1, sizeof(message_port_callback_item));
if (item == NULL) {
- pthread_mutex_unlock(&__mutex);
+ pthread_mutex_unlock(&mutex);
return MESSAGE_PORT_ERROR_OUT_OF_MEMORY;
}
item->callback = callback;
item->user_data = user_data;
- pthread_mutex_unlock(&__mutex);
+ pthread_mutex_unlock(&mutex);
} else {
_SECURE_LOGI("Register trusted local port fail (%d).", trusted_local_port_id);
}
}
_SECURE_LOGI("Send a message to (%s):(%s).", remote_app_id, remote_port);
- pthread_mutex_lock(&__mutex);
+ pthread_mutex_lock(&mutex);
ret = messageport_send_message(remote_app_id, remote_port, message);
- pthread_mutex_unlock(&__mutex);
+ pthread_mutex_unlock(&mutex);
return convert_to_tizen_error((messageport_error_e)ret);
}
}
_SECURE_LOGI("Send a trusted message to (%s):(%s).", remote_app_id, remote_port);
- pthread_mutex_lock(&__mutex);
+ pthread_mutex_lock(&mutex);
ret = messageport_send_trusted_message(remote_app_id, remote_port, message);
- pthread_mutex_unlock(&__mutex);
+ pthread_mutex_unlock(&mutex);
return convert_to_tizen_error((messageport_error_e)ret);
}
}
_SECURE_LOGI("Send a message to (%s):(%s) and listen at the local port ID (%d).", remote_app_id, remote_port, local_port_id);
- pthread_mutex_lock(&__mutex);
+ pthread_mutex_lock(&mutex);
ret = messageport_send_bidirectional_message(local_port_id, remote_app_id, remote_port, message);
- pthread_mutex_unlock(&__mutex);
+ pthread_mutex_unlock(&mutex);
return convert_to_tizen_error((messageport_error_e)ret);
}
}
_SECURE_LOGI("Send a trusted message to (%s):(%s) and listen at the local port ID (%d).", remote_app_id, remote_port, local_port_id);
- pthread_mutex_lock(&__mutex);
+ pthread_mutex_lock(&mutex);
ret = messageport_send_bidirectional_trusted_message(local_port_id, remote_app_id, remote_port, message);
- pthread_mutex_unlock(&__mutex);
+ pthread_mutex_unlock(&mutex);
return convert_to_tizen_error((messageport_error_e)ret);
}