#define MSG "msg"
#define DATA "data"
#define PID "pid"
+#define DATA_WORKER_QDATA_MAX_SIZE (3840 * 2160 * 4) /* UHD BGRA8888 */
typedef struct muse_recv_data_head {
unsigned int marker;
static muse_core_ipc_t *g_muse_core_ipc = NULL;
+static gboolean __muse_core_ipc_receive_message(int fd, char *buffer, int msg_len);
+static gboolean __muse_core_ipc_data_processing(int fd, muse_recv_data_head_t *header, muse_core_channel_info_t *ch);
static void _muse_core_ipc_client_cleanup(muse_module_h module);
static void _muse_core_ipc_foreach_get_pid_count(gpointer key, gpointer value, gpointer user_data);
static void _muse_core_ipc_add_pid_handle_count(int pid, int value);
static gpointer _muse_core_ipc_dispatch_worker(gpointer data);
static gpointer _muse_core_ipc_data_worker(gpointer data);
-static muse_recv_data_t *_muse_core_ipc_new_qdata(char **recv_buf, int recv_size, int *alloc_size);
static size_t _muse_core_ipc_get_complete_msg_len(int sock_fd, char *msg);
static int _muse_core_ipc_verify_msg_complete(int sock_fd, char *msg, int msg_len);
static gboolean _muse_core_ipc_init_bufmgr(void);
return NULL;
}
-static gpointer _muse_core_ipc_data_worker(gpointer data)
+
+static gboolean __muse_core_ipc_receive_message(int fd, char *buffer, int msg_len)
{
+ int offset = 0;
int recv_len = 0;
- int cur_len = 0;
- intptr_t fd = (intptr_t) data;
- muse_module_h module = NULL;
- char *recv_buf = NULL;
- char *new_buf = NULL;
- int alloc_size = 0;
- int qdata_size = 0;
- void *jobj = NULL;
- intptr_t module_addr = 0;
- muse_recv_data_t *qdata = NULL;
- muse_core_channel_info_t *ch = NULL;
char err_msg[MUSE_MAX_MSG_LEN] = {'\0',};
- g_return_val_if_fail(fd > 0, NULL);
- LOGD("Enter");
+ if (fd <= 0 || !buffer || msg_len <= 0) {
+ LOGE("invalid param : fd %d, buffer %p, msg_len %d", fd, buffer, msg_len);
+ return FALSE;
+ }
- while (1) {
- if (!recv_buf) {
- alloc_size = MUSE_MSG_MAX_LENGTH;
- recv_buf = g_try_new0(char, alloc_size);
- }
- if (!recv_buf) {
- LOGE("Out of memory");
- break;
+ do {
+ recv_len = recv(fd, buffer + offset, msg_len - offset, 0);
+ if (recv_len <= 0) {
+ strerror_r(errno, err_msg, MUSE_MAX_MSG_LEN);
+ LOGE("[fd %d] recv : %s (%d)", fd, err_msg, errno);
+ return FALSE;
}
- recv_len = recv(fd, recv_buf + cur_len, alloc_size - cur_len, 0);
- cur_len += recv_len;
- LOGI("buff %p, recv_len %d, cur_len %d, alloc_size %d", recv_buf, recv_len, cur_len, alloc_size);
- if (recv_len <= 0) {
- if (recv_len != 0) {
- strerror_r(errno, err_msg, MUSE_MAX_MSG_LEN);
- LOGE("[%d] recv : %s (%d)", fd, err_msg, errno);
- }
- break;
- } else {
- if (module) {
- while ((qdata = _muse_core_ipc_new_qdata(&recv_buf, cur_len, &alloc_size))) {
- qdata_size = qdata->header.size + sizeof(muse_recv_data_head_t);
- if (cur_len > qdata_size) {
- alloc_size = alloc_size - qdata_size;
- new_buf = g_try_new0(char, alloc_size);
- if (!new_buf) {
- LOGE("Cannot allocate memory");
- break;
- }
- memcpy(new_buf, recv_buf + qdata_size, cur_len - qdata_size);
- recv_buf = new_buf;
- }
- ch = &module->ch[MUSE_CHANNEL_DATA];
- g_mutex_lock(&ch->mutex);
- g_queue_push_tail(ch->queue, qdata);
- g_cond_signal(&ch->cond);
- g_mutex_unlock(&ch->mutex);
-
- cur_len -= qdata_size;
- if (!cur_len) {
- recv_buf = NULL;
- break;
- }
- }
- } else {
- module_addr = 0;
+ offset += recv_len;
+ } while (offset < msg_len);
- jobj = muse_core_msg_json_object_new(recv_buf, &recv_len, NULL);
- if (jobj) {
- if (muse_core_msg_json_object_get_value(MUSE_MODULE_ADDR, jobj, &module_addr, MUSE_TYPE_POINTER))
- module = (muse_module_h) module_addr;
- muse_core_msg_json_object_free(jobj);
- }
+ if (offset != msg_len) {
+ LOGE("invalid length received : try %d -> result %d", msg_len, offset);
+ return FALSE;
+ }
- LOGD("module : %p", module);
+ return TRUE;
+}
- if (module) {
- module->ch[MUSE_CHANNEL_DATA].p_gthread = g_thread_self();
- if (!module->ch[MUSE_CHANNEL_DATA].p_gthread) {
- MUSE_FREE(recv_buf);
- return NULL;
- }
- module->ch[MUSE_CHANNEL_DATA].fd = fd;
- } else {
- LOGE("we have to check %s", recv_buf);
- muse_core_log_process_info((int)g_muse_core_ipc->pid);
- }
- MUSE_FREE(recv_buf);
- cur_len = 0;
- }
- }
+static gboolean __muse_core_ipc_data_processing(int fd, muse_recv_data_head_t *header, muse_core_channel_info_t *ch)
+{
+ char *raw_data = NULL;
+
+ if (!(fd > 0 && header && ch)) {
+ LOGE("invalid param %d %p %p", fd, header, ch);
+ goto _PROCESSING_FAILED;
}
- MUSE_FREE(recv_buf);
+ /* check marker */
+ if (header->marker != MUSE_DATA_HEAD) {
+ LOGE("invalid marker 0x%x", header->marker);
+ goto _PROCESSING_FAILED;
+ }
- LOGD("Leave");
- return NULL;
+ /* check data size */
+ if (header->size > DATA_WORKER_QDATA_MAX_SIZE) {
+ LOGE("invalid data size %d", header->size);
+ goto _PROCESSING_FAILED;
+ }
+
+ /* allocation data */
+ raw_data = (char *)g_try_new0(char, header->size + sizeof(muse_recv_data_head_t));
+ if (!raw_data) {
+ LOGE("failed to alloc data %d + %d", header->size, sizeof(muse_recv_data_head_t));
+ goto _PROCESSING_FAILED;
+ }
+
+ /* copy header */
+ memcpy(raw_data, header, sizeof(muse_recv_data_head_t));
+
+ /* receive data */
+ if (!__muse_core_ipc_receive_message(fd, raw_data + sizeof(muse_recv_data_head_t), header->size)) {
+ LOGE("receive data failed - length %d", header->size);
+ goto _PROCESSING_FAILED;
+ }
+
+ /* push data */
+ g_mutex_lock(&ch->mutex);
+ g_queue_push_tail(ch->queue, (gpointer)raw_data);
+ g_cond_signal(&ch->cond);
+ g_mutex_unlock(&ch->mutex);
+
+ return TRUE;
+
+_PROCESSING_FAILED:
+
+ MUSE_G_FREE(raw_data);
+
+ muse_core_log_process_info((int)g_muse_core_ipc->pid);
+
+ return FALSE;
}
-static muse_recv_data_t *_muse_core_ipc_new_qdata(char **recv_buf, int recv_size, int *alloc_size)
+
+static gpointer _muse_core_ipc_data_worker(gpointer data)
{
- int qdata_size, old_size;
- muse_recv_data_t *qdata = (muse_recv_data_t *)*recv_buf;
- g_return_val_if_fail(recv_buf, NULL);
+ void *jobj = NULL;
+ intptr_t module_addr = 0;
+ char recv_buf[MUSE_MAX_MSG_LEN] = {'\0',};
+ intptr_t fd = (intptr_t)data;
+ muse_module_h module = NULL;
+ muse_core_channel_info_t *ch = NULL;
+
+ g_return_val_if_fail(fd > 0, NULL);
+
+ LOGW("Enter - fd %d", fd);
- if (qdata->header.marker != MUSE_DATA_HEAD) {
- LOGE("[%p] Invalid data header header.marker : %x recv_size : %d alloc_size : %d ",
- qdata, qdata->header.marker, recv_size, *alloc_size);
- return NULL;
+ /* get module */
+ if (recv(fd, recv_buf, MUSE_MAX_MSG_LEN, 0) <= 0) {
+ LOGE("failed to receive message for module [errno %d]", errno);
+ goto _WORKER_FAILED;
}
- qdata_size = qdata->header.size + sizeof(muse_recv_data_head_t);
- if (qdata_size > recv_size) {
- LOGI("recv is not completed");
- if (qdata_size > *alloc_size) {
- old_size = *alloc_size;
- LOGI("Realloc %d -> %d", *alloc_size, qdata_size);
- *alloc_size = qdata_size;
- *recv_buf = g_try_renew(char, *recv_buf, *alloc_size);
- if (*recv_buf)
- memset(*recv_buf + old_size, 0x0, qdata_size - old_size);
- else
- LOGE("Cannot allocate memory");
+ jobj = muse_core_msg_json_object_new(recv_buf, NULL, NULL);
+ if (jobj) {
+ if (muse_core_msg_json_object_get_value(MUSE_MODULE_ADDR, jobj, &module_addr, MUSE_TYPE_POINTER))
+ module = (muse_module_h)module_addr;
+
+ muse_core_msg_json_object_free(jobj);
+ }
+
+ if (!module) {
+ LOGE("failed to get module from message [%s]", recv_buf);
+ goto _WORKER_FAILED;
+ }
+
+ LOGW("module : %p", module);
+
+ module->ch[MUSE_CHANNEL_DATA].p_gthread = g_thread_self();
+ if (!module->ch[MUSE_CHANNEL_DATA].p_gthread) {
+ LOGE("g_thread_self failed");
+ goto _WORKER_FAILED;
+ }
+
+ module->ch[MUSE_CHANNEL_DATA].fd = fd;
+ ch = &module->ch[MUSE_CHANNEL_DATA];
+
+ /* get data */
+ while (1) {
+ if (!__muse_core_ipc_receive_message(fd, recv_buf, sizeof(muse_recv_data_head_t))) {
+ LOGE("receive header failed");
+ break;
+ }
+
+ if (!__muse_core_ipc_data_processing(fd, (muse_recv_data_head_t *)recv_buf, ch)) {
+ LOGE("ipc data processing failed");
+ break;
}
- return NULL;
}
- return qdata;
+ LOGW("Leave");
+
+ return NULL;
+
+_WORKER_FAILED:
+ muse_core_log_process_info((int)g_muse_core_ipc->pid);
+
+ return NULL;
}
+
static size_t _muse_core_ipc_get_complete_msg_len(int sock_fd, char *msg)
{
size_t max_diff = 0;
void muse_core_ipc_delete_data(char *data)
{
- muse_recv_data_t *qdata;
- g_return_if_fail(data);
+ char *raw_data = NULL;
- qdata = (muse_recv_data_t *)(data - sizeof(muse_recv_data_head_t));
- if (qdata && qdata->header.marker == MUSE_DATA_HEAD)
- MUSE_FREE(qdata);
+ if (!data) {
+ LOGE("NULL data");
+ return;
+ }
+
+ raw_data = data - sizeof(muse_recv_data_head_t);
+
+ MUSE_G_FREE(raw_data);
}
char *muse_core_ipc_get_data(muse_module_h module)
{
- muse_recv_data_t *qdata;
muse_core_channel_info_t *ch;
char *raw_data = NULL;
g_return_val_if_fail(ch->queue, NULL);
g_mutex_lock(&ch->mutex);
- while ((qdata = g_queue_pop_head(ch->queue)) == NULL) {
+ while ((raw_data = (char *)g_queue_pop_head(ch->queue)) == NULL) {
if (!g_cond_wait_until(&ch->cond, &ch->mutex, end_time)) {
- LOGE("timeout has passed");
+ LOGE("[%d] timeout has passed", ch->fd);
break;
}
}
g_mutex_unlock(&ch->mutex);
- if (qdata)
- raw_data = (char *)qdata + sizeof(muse_recv_data_head_t);
- else
- LOGE("qdata is NULL");
+ if (!raw_data)
+ LOGE("NULL raw_data");
- return raw_data;
+ return (raw_data + sizeof(muse_recv_data_head_t));
}
bool muse_core_ipc_get_data_info(char *data, uint64_t *data_id, int *size)