Add destroy handler for g_io_add_watch 89/158289/10
authorInkyun Kil <inkyun.kil@samsung.com>
Tue, 31 Oct 2017 00:11:46 +0000 (09:11 +0900)
committerInkyun Kil <inkyun.kil@samsung.com>
Tue, 21 Nov 2017 06:55:04 +0000 (15:55 +0900)
This is for applications running in a multithreaded environment.
When socket is disconnected in a main thread and port is unregistered in
another thread simultaneously, memory corruption can occur.

Change-Id: I54518e156f1a92ca11a9d77a1af0823ae9520e4b
Signed-off-by: Inkyun Kil <inkyun.kil@samsung.com>
Signed-off-by: jusung son <jusung07.son@samsung.com>
src/message_port.c
src/message_port_local.c
src/message_port_remote.c

index 877d054..89778b6 100755 (executable)
@@ -78,7 +78,9 @@ int message_port_unregister_local_port(int local_port_id)
                return MESSAGE_PORT_ERROR_INVALID_PARAMETER;
        }
 
+       pthread_mutex_lock(&mutex);
        res = unregister_local_port(local_port_id, false);
+       pthread_mutex_unlock(&mutex);
 
        return res;
 }
@@ -92,7 +94,9 @@ int message_port_unregister_trusted_local_port(int trusted_local_port_id)
                return MESSAGE_PORT_ERROR_INVALID_PARAMETER;
        }
 
+       pthread_mutex_lock(&mutex);
        res = unregister_local_port(trusted_local_port_id, true);
+       pthread_mutex_unlock(&mutex);
 
        return res;
 }
@@ -107,7 +111,9 @@ int message_port_check_remote_port(const char *remote_app_id, const char *remote
        }
        _LOGD("Check remote port (%s):(%s).", remote_app_id, remote_port);
 
+       pthread_mutex_lock(&mutex);
        ret = check_remote_port(remote_app_id, remote_port, false, exist);
+       pthread_mutex_unlock(&mutex);
 
        return ret;
 }
@@ -121,8 +127,9 @@ int message_port_check_trusted_remote_port(const char *remote_app_id, const char
                return MESSAGE_PORT_ERROR_INVALID_PARAMETER;
        }
        _LOGD("Check trusted remote port (%s):(%s).", remote_app_id, remote_port);
-
+       pthread_mutex_lock(&mutex);
        ret = check_remote_port(remote_app_id, remote_port, true, exist);
+       pthread_mutex_unlock(&mutex);
 
        return ret;
 }
index d9a01ea..5b61c85 100755 (executable)
@@ -70,6 +70,17 @@ typedef struct port_list_info {
        int delay_src_id;
 } port_list_info_s;
 
+typedef struct port_key_info {
+       char *remote_app_id;
+       char *port_name;
+       bool is_trusted;
+} port_key_info_s;
+
+typedef struct delay_port_info {
+       port_key_info_s *key_info;
+       port_list_info_s *port_info;
+} delay_port_info;
+
 typedef struct registered_callback_info {
        char *remote_app_id;
        char *remote_port;
@@ -261,6 +272,17 @@ static int __remote_port_compare_cb(gconstpointer a, gconstpointer b)
        return 1;
 }
 
+static int __key_compare_cb(gconstpointer a, gconstpointer b)
+{
+       port_list_info_s *key1 = (port_list_info_s *)a;
+       port_key_info_s *key2 = (port_key_info_s *)b;
+
+       if (key1->is_trusted == key2->is_trusted)
+               return strcmp(key1->port_name, key2->port_name);
+
+       return 1;
+}
+
 static port_list_info_s *__set_remote_port_info(const char *remote_app_id, const char *remote_port, bool is_trusted)
 {
        int ret_val = MESSAGE_PORT_ERROR_NONE;
@@ -322,16 +344,112 @@ out:
        return remote_app_info;
 }
 
+
+static void __free_port_info_by_key(port_key_info_s *key_info)
+{
+       port_list_info_s *found_port_info;
+       message_port_remote_app_info_s *found_remote_port_info;
+       GList *cb_list;
+
+       found_remote_port_info =
+               (message_port_remote_app_info_s *)g_hash_table_lookup(
+                               __remote_app_info, key_info->remote_app_id);
+       if (found_remote_port_info == NULL)
+               goto release;
+
+       cb_list = g_list_find_custom(found_remote_port_info->port_list, key_info,
+                                       (GCompareFunc)__key_compare_cb);
+       if (cb_list == NULL)
+               goto release;
+
+       found_port_info = (port_list_info_s *)cb_list->data;
+       __free_port_info(found_port_info);
+
+       return;
+
+release:
+       _LOGE("Not found port_info");
+}
+
+static void __free_key_info(port_key_info_s *key_info)
+{
+       FREE_AND_NULL(key_info->port_name);
+       FREE_AND_NULL(key_info->remote_app_id);
+       FREE_AND_NULL(key_info);
+}
+
 static gboolean __socket_disconnect_handler(GIOChannel *gio,
                GIOCondition cond,
                gpointer data)
 {
        _LOGI("__socket_disconnect_handler %d", cond);
-       __free_port_info(data);
+       pthread_mutex_lock(&mutex);
+       __free_port_info_by_key((port_key_info_s *)data);
+       pthread_mutex_unlock(&mutex);
 
        return FALSE;
 }
 
+static void __socket_destroy_handler(gpointer data)
+{
+       _LOGI("__socket_destroy_handler");
+
+       port_key_info_s *key_info = (port_key_info_s *)data;
+       __free_key_info(key_info);
+}
+
+
+static void __delay_socket_destroy_handler(gpointer data)
+{
+       _LOGI("__delay_socket_destroy_handler");
+       delay_port_info *delay_info = (delay_port_info *)data;
+
+       FREE_AND_NULL(delay_info->key_info->port_name);
+       FREE_AND_NULL(delay_info->key_info->remote_app_id);
+       FREE_AND_NULL(delay_info->key_info);
+       free(delay_info);
+}
+
+static int __create_port_key_info(
+               port_list_info_s *port_info,
+               port_key_info_s **key_info)
+{
+       int ret_val = MESSAGE_PORT_ERROR_NONE;
+       port_key_info_s *_key_info = (port_key_info_s *)
+               calloc(1, sizeof(port_key_info_s));
+       if (_key_info == NULL) {
+               ret_val = MESSAGE_PORT_ERROR_OUT_OF_MEMORY;
+               goto out;
+       }
+
+       _key_info->port_name = strdup(port_info->port_name);
+       if (_key_info->port_name == NULL) {
+               ret_val = MESSAGE_PORT_ERROR_OUT_OF_MEMORY;
+               goto out;
+       }
+
+       _key_info->is_trusted = port_info->is_trusted;
+
+       _key_info->remote_app_id = strdup(port_info->remote_app_info->remote_app_id);
+       if (_key_info->remote_app_id == NULL) {
+               ret_val = MESSAGE_PORT_ERROR_OUT_OF_MEMORY;
+               goto out;
+       }
+
+out:
+       if (ret_val == MESSAGE_PORT_ERROR_OUT_OF_MEMORY) {
+               _LOGE("out of memory");
+               if (_key_info) {
+                       FREE_AND_NULL(_key_info->port_name);
+                       FREE_AND_NULL(_key_info->remote_app_id);
+                       free(_key_info);
+               }
+       }
+
+       *key_info = _key_info;
+       return ret_val;
+}
+
 static int __get_remote_port_info(const char *remote_app_id, const char *remote_port, bool is_trusted,
                message_port_remote_app_info_s **mri, port_list_info_s **pli)
 {
@@ -341,7 +459,6 @@ static int __get_remote_port_info(const char *remote_app_id, const char *remote_
        int ret_val = MESSAGE_PORT_ERROR_NONE;
 
        remote_app_info = (message_port_remote_app_info_s *)g_hash_table_lookup(__remote_app_info, remote_app_id);
-
        if (remote_app_info == NULL) {
                remote_app_info = __set_remote_app_info(remote_app_id, remote_port, is_trusted);
 
@@ -597,10 +714,28 @@ out:
 }
 /* LCOV_EXCL_STOP */
 
+static bool __validate_delay_port_info(delay_port_info *delay_info)
+{
+       message_port_remote_app_info_s *found_remote_port_info;
+       GList *cb_list;
+
+       found_remote_port_info =
+               (message_port_remote_app_info_s *)g_hash_table_lookup(
+                               __remote_app_info, delay_info->key_info->remote_app_id);
+       if (found_remote_port_info == NULL)
+               return false;
+
+       cb_list = g_list_find(found_remote_port_info->port_list, delay_info->port_info);
+       if (cb_list == NULL)
+               return false;
+
+       return true;
+}
 /* LCOV_EXCL_START */
 static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer data)
 {
-       port_list_info_s *port_info = (port_list_info_s *)data;
+       delay_port_info *delay_info = (delay_port_info *)data;
+       port_list_info_s *port_info = delay_info->port_info;
        delay_message_info_s *message;
        int ret;
 
@@ -609,6 +744,11 @@ static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer d
 
        pthread_mutex_lock(&mutex);
 
+       if (__validate_delay_port_info(delay_info) == false) {
+               pthread_mutex_unlock(&mutex);
+               return G_SOURCE_REMOVE;
+       }
+
        if (port_info->delayed_message_list == NULL) {
                port_info->delayed_message_size = 0;
                port_info->delay_src_id = 0;
@@ -622,7 +762,7 @@ static gboolean __process_delayed_message(gint fd, GIOCondition cond, gpointer d
                        pthread_mutex_unlock(&mutex);
                        return G_SOURCE_CONTINUE;
                } else if (ret == MESSAGE_PORT_ERROR_IO_ERROR) {
-                       __free_port_info((gpointer)port_info);
+                       port_info->delay_src_id = 0;
                        pthread_mutex_unlock(&mutex);
                        return G_SOURCE_REMOVE;
                }
@@ -654,6 +794,7 @@ static int __insert_delayed_message(port_list_info_s *port_info,
        unsigned int tmp_size;
        unsigned int message_size;
        int ret = MESSAGE_PORT_ERROR_NONE;
+       delay_port_info *delay_info;
 
        if (port_info->delayed_message_size >= QUEUE_SIZE_MAX) {
                _LOGE("cache fail : delayed_message_size (%d), count(%d)",
@@ -695,9 +836,24 @@ static int __insert_delayed_message(port_list_info_s *port_info,
        port_info->delayed_message_list = g_list_append(port_info->delayed_message_list, message);
 
        if (port_info->delay_src_id == 0) {
-                       port_info->delay_src_id = g_unix_fd_add_full(G_PRIORITY_DEFAULT,
-                                                       port_info->send_sock_fd, G_IO_OUT, __process_delayed_message,
-                                                       port_info, NULL);
+               delay_info = (delay_port_info *)calloc(1, sizeof(delay_port_info));
+               if (delay_info == NULL) {
+                       _LOGE("out of memory");
+                       ret = MESSAGE_PORT_ERROR_OUT_OF_MEMORY;
+                       goto out;
+               }
+
+               ret = __create_port_key_info(port_info, &delay_info->key_info);
+               if (ret != MESSAGE_PORT_ERROR_NONE) {
+                       free(delay_info);
+                       goto out;
+               }
+
+               delay_info->port_info = port_info;
+
+               port_info->delay_src_id = g_unix_fd_add_full(G_PRIORITY_DEFAULT,
+                                               port_info->send_sock_fd, G_IO_OUT, __process_delayed_message,
+                                               delay_info, __delay_socket_destroy_handler);
        }
 
        _LOGE("inserted : pm(%s) fd(%d) ms(%d) ds(%d) dlc(%d) sqn(%d) sb (%d)",
@@ -705,8 +861,6 @@ static int __insert_delayed_message(port_list_info_s *port_info,
                port_info->delayed_message_size,
                g_list_length(port_info->delayed_message_list), sequence, sent_bytes);
 
-
-
 out:
        if (ret != MESSAGE_PORT_ERROR_NONE)
                __free_delay_message_info(message);
@@ -805,6 +959,7 @@ int send_message(const char *remote_appid, const char *remote_port,
        GVariant *body = NULL;
        int sock_pair[2] = {0,};
        char buf[1024];
+       port_key_info_s *__key_info;
 
        if (!_initialized) {
                if (!__initialize())
@@ -819,10 +974,11 @@ int send_message(const char *remote_appid, const char *remote_port,
                bool exist = false;
                _LOGD("port exist check !!");
                ret =  check_remote_port(remote_appid, remote_port, trusted_message, &exist);
-               if (ret != MESSAGE_PORT_ERROR_NONE)
+               if (ret != MESSAGE_PORT_ERROR_NONE) {
                        return ret;
-               else if (!exist)
+               } else if (!exist) {
                        return MESSAGE_PORT_ERROR_PORT_NOT_FOUND;
+               }
        }
 
        if (port_info->send_sock_fd > 0) {
@@ -875,14 +1031,23 @@ int send_message(const char *remote_appid, const char *remote_port,
                                        goto out;
                                }
 
-                               port_info->g_src_id = g_io_add_watch(
+                               ret = __create_port_key_info(port_info, &__key_info);
+                               if (ret != MESSAGE_PORT_ERROR_NONE) {
+                                       _LOGE("out of memory");
+                                       goto out;
+                               }
+
+                               port_info->g_src_id = g_io_add_watch_full(
                                                port_info->gio_read,
+                                               G_PRIORITY_DEFAULT,
                                                G_IO_IN | G_IO_HUP,
                                                __socket_disconnect_handler,
-                                               (gpointer)port_info);
+                                               (gpointer)__key_info,
+                                               __socket_destroy_handler);
                                if (port_info->g_src_id == 0) {
                                        _LOGE("fail to add watch on socket");
                                        ret = MESSAGE_PORT_ERROR_IO_ERROR;
+                                       __free_key_info(__key_info);
                                        goto out;
                                }
 
@@ -916,7 +1081,14 @@ out:
                g_object_unref(fd_list);
 
        if (ret != MESSAGE_PORT_ERROR_NONE) {
-               __free_port_info((gpointer)port_info);
+               __key_info = NULL;
+               __create_port_key_info(port_info, &__key_info);
+
+               if (__key_info != NULL) {
+                       __free_port_info_by_key(__key_info);
+                       __free_key_info(__key_info);
+               }
+
                if (sock_pair[SOCK_PAIR_SENDER])
                        close(sock_pair[SOCK_PAIR_SENDER]);
                if (sock_pair[SOCK_PAIR_RECEIVER])
index dc743f5..73778c9 100755 (executable)
@@ -50,6 +50,8 @@ static GHashTable *__trusted_app_list_hash;
 static GHashTable *__callback_info_hash;
 static GHashTable *__sender_appid_hash;
 
+extern pthread_mutex_t mutex;
+
 typedef struct message_port_pkt {
        int remote_port_name_len;
        char *remote_port_name;
@@ -67,6 +69,11 @@ typedef struct message_port_callback_info {
        int g_src_id;
 } message_port_callback_info_s;
 
+typedef struct callback_key_info {
+       int local_id;
+       message_port_callback_info_s *callback_info;
+} callback_key_info_s;
+
 static void __callback_info_free(gpointer data)
 {
        message_port_callback_info_s *callback_info = (message_port_callback_info_s *)data;
@@ -77,6 +84,13 @@ static void __callback_info_free(gpointer data)
        if (callback_info->remote_app_id)
                FREE_AND_NULL(callback_info->remote_app_id);
 
+       if (callback_info->local_info) {
+               if (callback_info->local_info->port_name)
+                               FREE_AND_NULL(callback_info->local_info->port_name);
+
+               FREE_AND_NULL(callback_info->local_info);
+       }
+
        if (callback_info->gio_read != NULL) {
                g_io_channel_shutdown(callback_info->gio_read, TRUE, &error);
                if (error) {
@@ -116,8 +130,8 @@ static void __callback_info_free_by_info(message_port_callback_info_s *callback_
 
 static void __hash_destroy_callback_info(gpointer data)
 {
-
        GList *callback_list = (GList *)data;
+
        if (callback_list != NULL)
                g_list_free_full(callback_list, __callback_info_free);
 }
@@ -276,61 +290,104 @@ static message_port_pkt_s *__message_port_recv_raw(int fd)
        return pkt;
 }
 
+static bool __validate_callback_info(callback_key_info_s *key_info)
+{
+       GList *cb_list;
+
+       cb_list = g_hash_table_lookup(__callback_info_hash,
+                       GUINT_TO_POINTER(key_info->local_id));
+       if (cb_list == NULL) {
+               _LOGI("local_info : %d is already released", key_info->local_id);
+               return false;
+       }
+
+       cb_list = g_list_find(cb_list, key_info->callback_info);
+       if (cb_list == NULL) {
+               _LOGI("local_info : %d is already released", key_info->local_id);
+               return false;
+       }
+
+       return true;
+}
+
+static bool __validate_local_info(callback_key_info_s *key_info)
+{
+       GList *cb_list;
+
+       cb_list = g_hash_table_lookup(__local_port_info,
+                       GUINT_TO_POINTER(key_info->local_id));
+       if (cb_list == NULL) {
+               _LOGI("local_info : %d is already released", key_info->local_id);
+               return false;
+       }
+
+       return true;
+}
+
 static gboolean __socket_request_handler(GIOChannel *gio,
                GIOCondition cond,
                gpointer data)
 {
        int fd = 0;
-       message_port_callback_info_s *mi;
-       message_port_pkt_s *pkt;
+       message_port_callback_info_s *mi = NULL;
+       callback_key_info_s *key_info;
+       message_port_pkt_s *pkt = NULL;
        message_port_local_port_info_s *local_port_info;
        bundle *kb = NULL;
-       GError *error = NULL;
-       bool ret = TRUE;
-
-       mi = (message_port_callback_info_s *)data;
-       if (mi == NULL) {
+       bool ret = true;
+       bool existed = true;
 
-               g_io_channel_shutdown(gio, TRUE, &error);
-               if (error) {
-                       _LOGE("g_io_channel_shutdown error : %s", error->message);
-                       g_error_free(error);
-               }
-               g_io_channel_unref(gio);
+       key_info = (callback_key_info_s *)data;
+       if (key_info == NULL)
                return FALSE;
+
+       pthread_mutex_lock(&mutex);
+       if (__validate_callback_info(key_info) == false) {
+               ret = FALSE;
+               existed = FALSE;
+               pthread_mutex_unlock(&mutex);
+               goto out;
        }
 
+       if (__validate_local_info(key_info) == false) {
+               ret = FALSE;
+               pthread_mutex_unlock(&mutex);
+               goto out;
+       }
+       pthread_mutex_unlock(&mutex);
+
+       mi = key_info->callback_info;
+
        local_port_info = mi->local_info;
        if (local_port_info == NULL || local_port_info->callback == NULL) {
                _LOGE("Failed to get callback info");
-               __callback_info_free_by_info(mi);
-               return FALSE;
+               ret = FALSE;
+               goto out;
        }
 
        if (cond == G_IO_HUP) {
                _LOGI("socket G_IO_HUP");
-               __callback_info_free_by_info(mi);
-               return FALSE;
+               ret = FALSE;
+               goto out;
        }
 
        fd = g_io_channel_unix_get_fd(gio);
        if (fd < 0) {
                _LOGE("fail to get fd from io channel");
-               __callback_info_free_by_info(mi);
-               return FALSE;
+               ret = FALSE;
+               goto out;
        }
 
        pkt = __message_port_recv_raw(fd);
        if (pkt == NULL) {
                _LOGE("recv error on SOCKET");
-               __callback_info_free_by_info(mi);
-               return FALSE;
+               ret = FALSE;
+               goto out;
        }
 
        kb = bundle_decode(pkt->data, pkt->data_len);
        if (!kb) {
                _LOGE("Invalid argument : message");
-               __callback_info_free_by_info(mi);
                ret = FALSE;
                goto out;
        }
@@ -353,9 +410,114 @@ out:
                free(pkt);
        }
 
+       if (mi && ret == FALSE && existed == TRUE) {
+               pthread_mutex_lock(&mutex);
+               __callback_info_free_by_info(mi);
+               pthread_mutex_unlock(&mutex);
+       }
+
        return ret;
 }
 
+
+static void __socket_destroy_handler(gpointer data)
+{
+       _LOGI("__socket_destroy_handler");
+       callback_key_info_s *key_info = (callback_key_info_s *)data;
+       free(key_info);
+}
+
+static callback_key_info_s *__create_callback_key_info(message_port_callback_info_s *callback_info)
+{
+       callback_key_info_s *_key_info = (callback_key_info_s *)
+               calloc(1, sizeof(callback_key_info_s));
+
+       if (_key_info == NULL) {
+               _LOGE("out of memory");
+               return NULL;
+       }
+
+       _key_info->local_id = callback_info->local_id;
+       _key_info->callback_info = callback_info;
+
+       return _key_info;
+}
+
+static message_port_callback_info_s *__create_callback_info(message_port_local_port_info_s *mi, char *local_appid)
+{
+       message_port_local_port_info_s *local_info = NULL;
+       message_port_callback_info_s *callback_info = NULL;
+       bool ret = true;
+
+       callback_info = (message_port_callback_info_s *)calloc(1, sizeof(message_port_callback_info_s));
+       if (callback_info == NULL) {
+               _LOGE("out of memory");
+               return NULL;
+       }
+
+       local_info = (message_port_local_port_info_s *)calloc(1, sizeof(message_port_local_port_info_s));
+       if (local_info == NULL) {
+               ret = false;
+               _LOGE("out of memory");
+               goto out;
+       }
+
+       local_info->port_name = strdup(mi->port_name);
+       if (local_info->port_name == NULL) {
+               ret = false;
+               _LOGE("out of memory");
+               goto out;
+       }
+
+       local_info->callback = mi->callback;
+       local_info->is_trusted = mi->is_trusted;
+       local_info->local_id = mi->local_id;
+       local_info->user_data = mi->user_data;
+
+       callback_info->local_id = local_info->local_id;
+       callback_info->local_info = local_info;
+       callback_info->remote_app_id = strdup(local_appid);
+       if (callback_info->remote_app_id == NULL) {
+               ret = false;
+               _LOGE("out of memory");
+       }
+
+out:
+       if (ret == false) {
+               __callback_info_free(callback_info);
+               return NULL;
+       }
+
+       return callback_info;
+}
+
+static bool __callback_info_append(message_port_callback_info_s *callback_info)
+{
+       GList *callback_info_list = NULL;
+       message_port_callback_info_s *head_callback_info;
+
+       callback_info_list = g_hash_table_lookup(__callback_info_hash, GUINT_TO_POINTER(callback_info->local_id));
+       if (callback_info_list == NULL) {
+               head_callback_info = (message_port_callback_info_s *)calloc(1, sizeof(message_port_callback_info_s));
+               if (head_callback_info == NULL) {
+                       _LOGE("fail to alloc head_callback_info");
+                       return false;
+               }
+               head_callback_info->local_id = 0;
+               head_callback_info->remote_app_id = NULL;
+               head_callback_info->local_info = NULL;
+               head_callback_info->gio_read = NULL;
+               head_callback_info->g_src_id = 0;
+               callback_info_list = g_list_append(callback_info_list, head_callback_info);
+               callback_info_list = g_list_append(callback_info_list, callback_info);
+               g_hash_table_insert(__callback_info_hash, GUINT_TO_POINTER(callback_info->local_id), callback_info_list);
+       } else {
+               callback_info_list = g_list_append(callback_info_list, callback_info);
+       }
+
+       return true;
+}
+
 static bool __receive_message(GVariant *parameters, GDBusMethodInvocation *invocation)
 {
        char *local_port = NULL;
@@ -372,8 +534,7 @@ static bool __receive_message(GVariant *parameters, GDBusMethodInvocation *invoc
        message_port_local_port_info_s *mi;
        int local_reg_id = 0;
        message_port_callback_info_s *callback_info = NULL;
-       message_port_callback_info_s *head_callback_info;
-       GList *callback_info_list = NULL;
+       callback_key_info_s *key_info;
 
        char buf[1024];
        GDBusMessage *msg;
@@ -394,14 +555,26 @@ static bool __receive_message(GVariant *parameters, GDBusMethodInvocation *invoc
                _LOGE("Invalid argument : remote_appid is NULL");
                goto out;
        }
+
+       if (!local_appid) {
+               _LOGE("Invalid argument : local_appid");
+               goto out;
+       }
+
+       pthread_mutex_lock(&mutex);
        if (!is_local_port_registed(remote_port, remote_trusted, &local_reg_id, &mi)) {
                _LOGE("Invalid argument : remote_port:(%s) trusted(%d)", remote_port, remote_trusted);
+               pthread_mutex_unlock(&mutex);
                goto out;
        }
-       if (!local_appid) {
-               _LOGE("Invalid argument : local_appid");
+
+       callback_info = __create_callback_info(mi, local_appid);
+       if (callback_info == NULL) {
+               pthread_mutex_unlock(&mutex);
                goto out;
        }
+       pthread_mutex_unlock(&mutex);
+
        if (!local_port) {
                _LOGE("Invalid argument : local_port");
                goto out;
@@ -410,7 +583,7 @@ static bool __receive_message(GVariant *parameters, GDBusMethodInvocation *invoc
                _LOGE("Invalid argument : remote_appid (%s)", remote_appid);
                goto out;
        }
-       if (strcmp(remote_port, mi->port_name) != 0) {
+       if (strcmp(remote_port, callback_info->local_info->port_name) != 0) {
                _LOGE("Invalid argument : remote_port (%s)", remote_port);
                goto out;
        }
@@ -433,19 +606,22 @@ static bool __receive_message(GVariant *parameters, GDBusMethodInvocation *invoc
                }
        }
 
-       callback_info = (message_port_callback_info_s *)calloc(1, sizeof(message_port_callback_info_s));
-       if (callback_info == NULL) {
-               _LOGE("out of memory");
+       data = bundle_decode(raw, len);
+       if (!data) {
+               _LOGE("Invalid argument : message");
                goto out;
        }
 
-       callback_info->local_id = mi->local_id;
-       callback_info->local_info = mi;
-       callback_info->remote_app_id = strdup(local_appid);
-       if (callback_info->remote_app_id == NULL) {
-               _LOGE("out of memory");
-               goto out;
-       }
+       LOGD("call calback %s", local_appid);
+       if (bi_dir)
+               callback_info->local_info->callback(callback_info->local_info->local_id,
+                       local_appid, local_port, local_trusted, data, callback_info->local_info->user_data);
+       else
+               callback_info->local_info->callback(callback_info->local_info->local_id,
+                       local_appid, NULL, false, data, callback_info->local_info->user_data);
+       bundle_free(data);
+
+       ret = true;
 
        msg = g_dbus_method_invocation_get_message(invocation);
        fd_list = g_dbus_message_get_unix_fd_list(msg);
@@ -455,61 +631,50 @@ static bool __receive_message(GVariant *parameters, GDBusMethodInvocation *invoc
                returned_fds = g_unix_fd_list_steal_fds(fd_list, &fd_len);
                if (returned_fds == NULL) {
                        _LOGE("fail to get fds");
+                       ret = false;
                        goto out;
                }
                fd = returned_fds[0];
 
                LOGI("g_unix_fd_list_get %d fd: [%d]", fd_len, fd);
                if (fd > 0) {
-
                        callback_info->gio_read = g_io_channel_unix_new(fd);
                        if (!callback_info->gio_read) {
                                _LOGE("Error is %s\n", strerror_r(errno, buf, sizeof(buf)));
+                               ret = false;
+                               goto out;
+                       }
+
+                       key_info = __create_callback_key_info(callback_info);
+                       if (key_info == NULL) {
+                               _LOGE("out of memory");
+                               ret = false;
                                goto out;
                        }
 
-                       callback_info->g_src_id = g_io_add_watch(callback_info->gio_read, G_IO_IN | G_IO_HUP,
-                                       __socket_request_handler, (gpointer)callback_info);
+                       callback_info->g_src_id = g_io_add_watch_full(
+                                       callback_info->gio_read,
+                                       G_PRIORITY_DEFAULT,
+                                       G_IO_IN | G_IO_HUP,
+                                       __socket_request_handler,
+                                       (gpointer)key_info,
+                                       __socket_destroy_handler);
                        if (callback_info->g_src_id == 0) {
                                _LOGE("fail to add watch on socket");
+                               free(key_info);
+                               ret = false;
                                goto out;
                        }
 
-                       callback_info_list = g_hash_table_lookup(__callback_info_hash, GUINT_TO_POINTER(mi->local_id));
-                       if (callback_info_list == NULL) {
-                               head_callback_info = (message_port_callback_info_s *)calloc(1, sizeof(message_port_callback_info_s));
-                               if (head_callback_info == NULL) {
-                                       _LOGE("fail to alloc head_callback_info");
-                                       goto out;
-                               }
-                               head_callback_info->local_id = 0;
-                               head_callback_info->remote_app_id = NULL;
-                               head_callback_info->local_info = NULL;
-                               head_callback_info->gio_read = NULL;
-                               head_callback_info->g_src_id = 0;
-                               callback_info_list = g_list_append(callback_info_list, head_callback_info);
-                               callback_info_list = g_list_append(callback_info_list, callback_info);
-                               g_hash_table_insert(__callback_info_hash, GUINT_TO_POINTER(mi->local_id), callback_info_list);
-                       } else {
-                               callback_info_list = g_list_append(callback_info_list, callback_info);
+                       pthread_mutex_lock(&mutex);
+                       if (__callback_info_append(callback_info) == false) {
+                               _LOGE("fail to append callback_info");
+                               ret = false;
                        }
+                       pthread_mutex_unlock(&mutex);
                }
        }
 
-       data = bundle_decode(raw, len);
-       if (!data) {
-               _LOGE("Invalid argument : message");
-               goto out;
-       }
-
-       LOGD("call calback %s", local_appid);
-       if (bi_dir)
-               mi->callback(mi->local_id, local_appid, local_port, local_trusted, data, mi->user_data);
-       else
-               mi->callback(mi->local_id, local_appid, NULL, false, data, mi->user_data);
-       bundle_free(data);
-
-       ret = true;
 out:
        if (ret == false)
                __callback_info_free(callback_info);
@@ -865,7 +1030,6 @@ int unregister_local_port(int local_port_id, bool trusted_port)
                        return MESSAGE_PORT_ERROR_INVALID_PARAMETER;
                }
        }
-
        g_hash_table_remove(__local_port_info, GINT_TO_POINTER(local_port_id));
 
        return MESSAGE_PORT_ERROR_NONE;