Change message receive mechanism for data worker 00/123600/11 submit/tizen_3.0/20170410.001748
authorJeongmo Yang <jm80.yang@samsung.com>
Thu, 6 Apr 2017 07:51:17 +0000 (16:51 +0900)
committerJeongmo Yang <jm80.yang@samsung.com>
Fri, 7 Apr 2017 08:50:38 +0000 (17:50 +0900)
1. get message for module
2. get data recursively
 a. get header first
 b. check data size and allocate
 c. receive data recursively until received size is same with data size

[Version] 0.1.63
[Profile] Common
[Issue Type] Update
[Dependency module] N/A
[Test] [M(T) - Boot=(OK), sdb=(OK), Home=(OK), Touch=(OK), Version=tizen-mobile_20170403.2]

Change-Id: I4f4ac0ab0b6cb85b1da43035d83a6375bf4f5b05
Signed-off-by: Jeongmo Yang <jm80.yang@samsung.com>
packaging/mused.spec
src/muse_core_ipc.c

index 6a6a8a0..27a73f4 100644 (file)
@@ -1,6 +1,6 @@
 Name:       mused
 Summary:    A Multimedia Daemon in Tizen Native API
-Version:    0.1.62
+Version:    0.1.63
 Release:    0
 Group:      System/Libraries
 License:    Apache-2.0
index a3fd8ea..ae98c6e 100644 (file)
@@ -32,6 +32,7 @@
 #define MSG "msg"
 #define DATA "data"
 #define PID "pid"
+#define DATA_WORKER_QDATA_MAX_SIZE      (3840 * 2160 * 4) /* UHD BGRA8888 */
 
 typedef struct muse_recv_data_head {
        unsigned int marker;
@@ -46,12 +47,13 @@ typedef struct muse_recv_data {
 
 static muse_core_ipc_t *g_muse_core_ipc = NULL;
 
+static gboolean __muse_core_ipc_receive_message(int fd, char *buffer, int msg_len);
+static gboolean __muse_core_ipc_data_processing(int fd, muse_recv_data_head_t *header, muse_core_channel_info_t *ch);
 static void _muse_core_ipc_client_cleanup(muse_module_h module);
 static void _muse_core_ipc_foreach_get_pid_count(gpointer key, gpointer value, gpointer user_data);
 static void _muse_core_ipc_add_pid_handle_count(int pid, int value);
 static gpointer _muse_core_ipc_dispatch_worker(gpointer data);
 static gpointer _muse_core_ipc_data_worker(gpointer data);
-static muse_recv_data_t *_muse_core_ipc_new_qdata(char **recv_buf, int recv_size, int *alloc_size);
 static size_t _muse_core_ipc_get_complete_msg_len(int sock_fd, char *msg);
 static int _muse_core_ipc_verify_msg_complete(int sock_fd, char *msg, int msg_len);
 static gboolean _muse_core_ipc_init_bufmgr(void);
@@ -231,137 +233,160 @@ static gpointer _muse_core_ipc_dispatch_worker(gpointer data)
        return NULL;
 }
 
-static gpointer _muse_core_ipc_data_worker(gpointer data)
+
+static gboolean __muse_core_ipc_receive_message(int fd, char *buffer, int msg_len)
 {
+       int offset = 0;
        int recv_len = 0;
-       int cur_len = 0;
-       intptr_t fd = (intptr_t) data;
-       muse_module_h module = NULL;
-       char *recv_buf = NULL;
-       char *new_buf = NULL;
-       int alloc_size = 0;
-       int qdata_size = 0;
-       void *jobj = NULL;
-       intptr_t module_addr = 0;
-       muse_recv_data_t *qdata = NULL;
-       muse_core_channel_info_t *ch = NULL;
        char err_msg[MUSE_MAX_MSG_LEN] = {'\0',};
-       g_return_val_if_fail(fd > 0, NULL);
 
-       LOGD("Enter");
+       if (fd <= 0 || !buffer || msg_len <= 0) {
+               LOGE("invalid param : fd %d, buffer %p, msg_len %d", fd, buffer, msg_len);
+               return FALSE;
+       }
 
-       while (1) {
-               if (!recv_buf) {
-                       alloc_size = MUSE_MSG_MAX_LENGTH;
-                       recv_buf = g_try_new0(char, alloc_size);
-               }
-               if (!recv_buf) {
-                       LOGE("Out of memory");
-                       break;
+       do {
+               recv_len = recv(fd, buffer + offset, msg_len - offset, 0);
+               if (recv_len <= 0) {
+                       strerror_r(errno, err_msg, MUSE_MAX_MSG_LEN);
+                       LOGE("[fd %d] recv : %s (%d)", fd, err_msg, errno);
+                       return FALSE;
                }
-               recv_len = recv(fd, recv_buf + cur_len, alloc_size - cur_len, 0);
-               cur_len += recv_len;
 
-               LOGI("buff %p, recv_len %d, cur_len %d, alloc_size %d", recv_buf, recv_len, cur_len, alloc_size);
-               if (recv_len <= 0) {
-                       if (recv_len != 0) {
-                               strerror_r(errno, err_msg, MUSE_MAX_MSG_LEN);
-                               LOGE("[%d] recv : %s (%d)", fd, err_msg, errno);
-                       }
-                       break;
-               } else {
-                       if (module) {
-                               while ((qdata = _muse_core_ipc_new_qdata(&recv_buf, cur_len, &alloc_size))) {
-                                       qdata_size = qdata->header.size + sizeof(muse_recv_data_head_t);
-                                       if (cur_len > qdata_size) {
-                                               alloc_size = alloc_size - qdata_size;
-                                               new_buf = g_try_new0(char, alloc_size);
-                                               if (!new_buf) {
-                                                       LOGE("Cannot allocate memory");
-                                                       break;
-                                               }
-                                               memcpy(new_buf, recv_buf + qdata_size, cur_len - qdata_size);
-                                               recv_buf = new_buf;
-                                       }
-                                       ch = &module->ch[MUSE_CHANNEL_DATA];
-                                       g_mutex_lock(&ch->mutex);
-                                       g_queue_push_tail(ch->queue, qdata);
-                                       g_cond_signal(&ch->cond);
-                                       g_mutex_unlock(&ch->mutex);
-
-                                       cur_len -= qdata_size;
-                                       if (!cur_len) {
-                                               recv_buf = NULL;
-                                               break;
-                                       }
-                               }
-                       } else {
-                               module_addr = 0;
+               offset += recv_len;
+       } while (offset < msg_len);
 
-                               jobj = muse_core_msg_json_object_new(recv_buf, &recv_len, NULL);
-                               if (jobj) {
-                                       if (muse_core_msg_json_object_get_value(MUSE_MODULE_ADDR, jobj, &module_addr, MUSE_TYPE_POINTER))
-                                               module = (muse_module_h) module_addr;
-                                       muse_core_msg_json_object_free(jobj);
-                               }
+       if (offset != msg_len) {
+               LOGE("invalid length received : try %d -> result %d", msg_len, offset);
+               return FALSE;
+       }
 
-                               LOGD("module : %p", module);
+       return TRUE;
+}
 
-                               if (module) {
-                                       module->ch[MUSE_CHANNEL_DATA].p_gthread = g_thread_self();
-                                       if (!module->ch[MUSE_CHANNEL_DATA].p_gthread) {
-                                               MUSE_FREE(recv_buf);
-                                               return NULL;
-                                       }
-                                       module->ch[MUSE_CHANNEL_DATA].fd = fd;
-                               } else {
-                                       LOGE("we have to check %s", recv_buf);
-                                       muse_core_log_process_info((int)g_muse_core_ipc->pid);
-                               }
 
-                               MUSE_FREE(recv_buf);
-                               cur_len = 0;
-                       }
-               }
+static gboolean __muse_core_ipc_data_processing(int fd, muse_recv_data_head_t *header, muse_core_channel_info_t *ch)
+{
+       char *raw_data = NULL;
+
+       if (!(fd > 0 && header && ch)) {
+               LOGE("invalid param %d %p %p", fd, header, ch);
+               goto _PROCESSING_FAILED;
        }
 
-       MUSE_FREE(recv_buf);
+       /* check marker */
+       if (header->marker != MUSE_DATA_HEAD) {
+               LOGE("invalid marker 0x%x", header->marker);
+               goto _PROCESSING_FAILED;
+       }
 
-       LOGD("Leave");
-       return NULL;
+       /* check data size */
+       if (header->size > DATA_WORKER_QDATA_MAX_SIZE) {
+               LOGE("invalid data size %d", header->size);
+               goto _PROCESSING_FAILED;
+       }
+
+       /* allocation data */
+       raw_data = (char *)g_try_new0(char, header->size + sizeof(muse_recv_data_head_t));
+       if (!raw_data) {
+               LOGE("failed to alloc data %d + %d", header->size, sizeof(muse_recv_data_head_t));
+               goto _PROCESSING_FAILED;
+       }
+
+       /* copy header */
+       memcpy(raw_data, header, sizeof(muse_recv_data_head_t));
+
+       /* receive data */
+       if (!__muse_core_ipc_receive_message(fd, raw_data + sizeof(muse_recv_data_head_t), header->size)) {
+               LOGE("receive data failed - length %d", header->size);
+               goto _PROCESSING_FAILED;
+       }
+
+       /* push data */
+       g_mutex_lock(&ch->mutex);
+       g_queue_push_tail(ch->queue, (gpointer)raw_data);
+       g_cond_signal(&ch->cond);
+       g_mutex_unlock(&ch->mutex);
+
+       return TRUE;
+
+_PROCESSING_FAILED:
+
+       MUSE_G_FREE(raw_data);
+
+       muse_core_log_process_info((int)g_muse_core_ipc->pid);
+
+       return FALSE;
 }
 
-static muse_recv_data_t *_muse_core_ipc_new_qdata(char **recv_buf, int recv_size, int *alloc_size)
+
+static gpointer _muse_core_ipc_data_worker(gpointer data)
 {
-       int qdata_size, old_size;
-       muse_recv_data_t *qdata = (muse_recv_data_t *)*recv_buf;
-       g_return_val_if_fail(recv_buf, NULL);
+       void *jobj = NULL;
+       intptr_t module_addr = 0;
+       char recv_buf[MUSE_MAX_MSG_LEN] = {'\0',};
+       intptr_t fd = (intptr_t)data;
+       muse_module_h module = NULL;
+       muse_core_channel_info_t *ch = NULL;
+
+       g_return_val_if_fail(fd > 0, NULL);
+
+       LOGW("Enter - fd %d", fd);
 
-       if (qdata->header.marker != MUSE_DATA_HEAD) {
-               LOGE("[%p] Invalid data header header.marker : %x recv_size : %d alloc_size : %d ",
-                       qdata, qdata->header.marker, recv_size, *alloc_size);
-               return NULL;
+       /* get module */
+       if (recv(fd, recv_buf, MUSE_MAX_MSG_LEN, 0) <= 0) {
+               LOGE("failed to receive message for module [errno %d]", errno);
+               goto _WORKER_FAILED;
        }
 
-       qdata_size = qdata->header.size + sizeof(muse_recv_data_head_t);
-       if (qdata_size > recv_size) {
-               LOGI("recv is not completed");
-               if (qdata_size > *alloc_size) {
-                       old_size = *alloc_size;
-                       LOGI("Realloc %d -> %d", *alloc_size, qdata_size);
-                       *alloc_size = qdata_size;
-                       *recv_buf = g_try_renew(char, *recv_buf, *alloc_size);
-                       if (*recv_buf)
-                               memset(*recv_buf + old_size, 0x0, qdata_size - old_size);
-                       else
-                               LOGE("Cannot allocate memory");
+       jobj = muse_core_msg_json_object_new(recv_buf, NULL, NULL);
+       if (jobj) {
+               if (muse_core_msg_json_object_get_value(MUSE_MODULE_ADDR, jobj, &module_addr, MUSE_TYPE_POINTER))
+                       module = (muse_module_h)module_addr;
+
+               muse_core_msg_json_object_free(jobj);
+       }
+
+       if (!module) {
+               LOGE("failed to get module from message [%s]", recv_buf);
+               goto _WORKER_FAILED;
+       }
+
+       LOGW("module : %p", module);
+
+       module->ch[MUSE_CHANNEL_DATA].p_gthread = g_thread_self();
+       if (!module->ch[MUSE_CHANNEL_DATA].p_gthread) {
+               LOGE("g_thread_self failed");
+               goto _WORKER_FAILED;
+       }
+
+       module->ch[MUSE_CHANNEL_DATA].fd = fd;
+       ch = &module->ch[MUSE_CHANNEL_DATA];
+
+       /* get data */
+       while (1) {
+               if (!__muse_core_ipc_receive_message(fd, recv_buf, sizeof(muse_recv_data_head_t))) {
+                       LOGE("receive header failed");
+                       break;
+               }
+
+               if (!__muse_core_ipc_data_processing(fd, (muse_recv_data_head_t *)recv_buf, ch)) {
+                       LOGE("ipc data processing failed");
+                       break;
                }
-               return NULL;
        }
 
-       return qdata;
+       LOGW("Leave");
+
+       return NULL;
+
+_WORKER_FAILED:
+       muse_core_log_process_info((int)g_muse_core_ipc->pid);
+
+       return NULL;
 }
 
+
 static size_t _muse_core_ipc_get_complete_msg_len(int sock_fd, char *msg)
 {
        size_t max_diff = 0;
@@ -790,17 +815,20 @@ int muse_core_ipc_push_data(int sock_fd, const char *data, int size, uint64_t da
 
 void muse_core_ipc_delete_data(char *data)
 {
-       muse_recv_data_t *qdata;
-       g_return_if_fail(data);
+       char *raw_data = NULL;
 
-       qdata = (muse_recv_data_t *)(data - sizeof(muse_recv_data_head_t));
-       if (qdata && qdata->header.marker == MUSE_DATA_HEAD)
-               MUSE_FREE(qdata);
+       if (!data) {
+               LOGE("NULL data");
+               return;
+       }
+
+       raw_data = data - sizeof(muse_recv_data_head_t);
+
+       MUSE_G_FREE(raw_data);
 }
 
 char *muse_core_ipc_get_data(muse_module_h module)
 {
-       muse_recv_data_t *qdata;
        muse_core_channel_info_t *ch;
        char *raw_data = NULL;
 
@@ -811,20 +839,18 @@ char *muse_core_ipc_get_data(muse_module_h module)
        g_return_val_if_fail(ch->queue, NULL);
 
        g_mutex_lock(&ch->mutex);
-       while ((qdata = g_queue_pop_head(ch->queue)) == NULL) {
+       while ((raw_data = (char *)g_queue_pop_head(ch->queue)) == NULL) {
                if (!g_cond_wait_until(&ch->cond, &ch->mutex, end_time)) {
-                       LOGE("timeout has passed");
+                       LOGE("[%d] timeout has passed", ch->fd);
                        break;
                }
        }
        g_mutex_unlock(&ch->mutex);
 
-       if (qdata)
-               raw_data = (char *)qdata + sizeof(muse_recv_data_head_t);
-       else
-               LOGE("qdata is NULL");
+       if (!raw_data)
+               LOGE("NULL raw_data");
 
-       return raw_data;
+       return (raw_data + sizeof(muse_recv_data_head_t));
 }
 
 bool muse_core_ipc_get_data_info(char *data, uint64_t *data_id, int *size)