From 381aab0b83daacd99d934757c4ff666afeb3a4ad Mon Sep 17 00:00:00 2001 From: Jeesun Kim Date: Wed, 20 Apr 2016 20:51:39 +0900 Subject: [PATCH] clean up: devied files, remove router Change-Id: I236ad34ef1c6f6a8f0f4f4d5f02964969d35aa94 --- LICENSE.APLv2 | 2 +- NOTICE | 2 +- include/pims-ipc-data.h | 2 +- include/pims-ipc-svc.h | 5 +- include/pims-ipc-types.h | 34 +- include/pims-ipc.h | 16 +- packaging/pims-ipc.spec | 2 +- src/pims-debug.h | 56 +- src/pims-internal.h | 9 +- src/pims-ipc-data-internal.h | 51 +- src/pims-ipc-data.c | 101 +-- src/pims-ipc-pubsub.c | 292 +++++++ src/pims-ipc-pubsub.h | 26 + src/pims-ipc-svc.c | 1853 +++--------------------------------------- src/pims-ipc-utils.c | 48 ++ src/pims-ipc-utils.h | 26 + src/pims-ipc-worker.c | 638 +++++++++++++++ src/pims-ipc-worker.h | 62 ++ src/pims-ipc.c | 518 ++++++------ src/pims-socket.c | 493 ++++++++++- src/pims-socket.h | 23 +- 21 files changed, 2075 insertions(+), 2184 deletions(-) create mode 100644 src/pims-ipc-pubsub.c create mode 100644 src/pims-ipc-pubsub.h create mode 100644 src/pims-ipc-utils.c create mode 100644 src/pims-ipc-utils.h create mode 100644 src/pims-ipc-worker.c create mode 100644 src/pims-ipc-worker.h diff --git a/LICENSE.APLv2 b/LICENSE.APLv2 index 2a14afe..d7beda0 100644 --- a/LICENSE.APLv2 +++ b/LICENSE.APLv2 @@ -1,4 +1,4 @@ -Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. +Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. Apache License Version 2.0, January 2004 diff --git a/NOTICE b/NOTICE index ce761e0..1634c12 100644 --- a/NOTICE +++ b/NOTICE @@ -1 +1 @@ -Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. +Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. diff --git a/include/pims-ipc-data.h b/include/pims-ipc-data.h index 1b0c6f2..8ed2362 100644 --- a/include/pims-ipc-data.h +++ b/include/pims-ipc-data.h @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. diff --git a/include/pims-ipc-svc.h b/include/pims-ipc-svc.h index f187411..4393202 100644 --- a/include/pims-ipc-svc.h +++ b/include/pims-ipc-svc.h @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -37,13 +37,10 @@ int pims_ipc_svc_deinit_for_publish(void); int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data); void pims_ipc_svc_run_main_loop(GMainLoop* main_loop); - void pims_ipc_svc_set_client_disconnected_cb(pims_ipc_svc_client_disconnected_cb callback, void *userdata); - int pims_ipc_svc_get_smack_label(pims_ipc_h ipc, char **p_smack); bool pims_ipc_svc_check_privilege(pims_ipc_h ipc, char *privilege); - #ifdef __cplusplus } #endif diff --git a/include/pims-ipc-types.h b/include/pims-ipc-types.h index 1ea5bc3..279fa08 100644 --- a/include/pims-ipc-types.h +++ b/include/pims-ipc-types.h @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -36,26 +36,26 @@ typedef void* pims_ipc_h; typedef void* pims_ipc_data_h; typedef enum { - PIMS_IPC_DATA_TYPE_INVALID, - PIMS_IPC_DATA_TYPE_CHAR, - PIMS_IPC_DATA_TYPE_UCHAR, - PIMS_IPC_DATA_TYPE_INT, - PIMS_IPC_DATA_TYPE_UINT, - PIMS_IPC_DATA_TYPE_LONG, - PIMS_IPC_DATA_TYPE_ULONG, - PIMS_IPC_DATA_TYPE_FLOAT, - PIMS_IPC_DATA_TYPE_DOUBLE, - PIMS_IPC_DATA_TYPE_STRING, - PIMS_IPC_DATA_TYPE_MEMORY, + PIMS_IPC_DATA_TYPE_INVALID, + PIMS_IPC_DATA_TYPE_CHAR, + PIMS_IPC_DATA_TYPE_UCHAR, + PIMS_IPC_DATA_TYPE_INT, + PIMS_IPC_DATA_TYPE_UINT, + PIMS_IPC_DATA_TYPE_LONG, + PIMS_IPC_DATA_TYPE_ULONG, + PIMS_IPC_DATA_TYPE_FLOAT, + PIMS_IPC_DATA_TYPE_DOUBLE, + PIMS_IPC_DATA_TYPE_STRING, + PIMS_IPC_DATA_TYPE_MEMORY, } pims_ipc_data_type_e; typedef void (*pims_ipc_svc_call_cb)(pims_ipc_h ipc, pims_ipc_data_h data_in, - pims_ipc_data_h *data_out, void *userdata); -typedef void (*pims_ipc_svc_client_disconnected_cb)(pims_ipc_h ipc, void *userdata); + pims_ipc_data_h *data_out, void *user_data); +typedef void (*pims_ipc_svc_client_disconnected_cb)(pims_ipc_h ipc, void *user_data); -typedef void (*pims_ipc_call_async_cb)(pims_ipc_h ipc, pims_ipc_data_h data_out, void *userdata); -typedef void (*pims_ipc_subscribe_cb)(pims_ipc_h ipc, pims_ipc_data_h data, void *userdata); -typedef void (*pims_ipc_server_disconnected_cb)(void *userdata); +typedef void (*pims_ipc_call_async_cb)(pims_ipc_h ipc, pims_ipc_data_h data_out, void *user_data); +typedef void (*pims_ipc_subscribe_cb)(pims_ipc_h ipc, pims_ipc_data_h data, void *user_data); +typedef void (*pims_ipc_server_disconnected_cb)(void *user_data); #ifdef __cplusplus diff --git a/include/pims-ipc.h b/include/pims-ipc.h index bf65c7a..04696f1 100644 --- a/include/pims-ipc.h +++ b/include/pims-ipc.h @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -30,16 +30,18 @@ extern "C" pims_ipc_h pims_ipc_create(char *service); void pims_ipc_destroy(pims_ipc_h ipc); int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in, - pims_ipc_data_h *data_out); -int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in, - pims_ipc_call_async_cb callback, void *userdata); -bool pims_ipc_is_call_in_progress(pims_ipc_h ipc); + pims_ipc_data_h *data_out); +int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, + pims_ipc_data_h data_in, pims_ipc_call_async_cb callback, void *user_data); +int pims_ipc_is_call_in_progress(pims_ipc_h ipc); pims_ipc_h pims_ipc_create_for_subscribe(char *service); void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc); -int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata); +int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, + pims_ipc_subscribe_cb callback, void *user_data); int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event); -int pims_ipc_add_server_disconnected_cb(pims_ipc_h ipc, pims_ipc_server_disconnected_cb callback, void *user_data); +int pims_ipc_add_server_disconnected_cb(pims_ipc_h ipc, + pims_ipc_server_disconnected_cb callback, void *user_data); int pims_ipc_remove_server_disconnected_cb(pims_ipc_h ipc); /* start deprecated */ diff --git a/packaging/pims-ipc.spec b/packaging/pims-ipc.spec index 27093ca..19685d3 100644 --- a/packaging/pims-ipc.spec +++ b/packaging/pims-ipc.spec @@ -1,6 +1,6 @@ Name: pims-ipc Summary: library for PIMs IPC -Version: 0.1.16 +Version: 0.1.17 Release: 1 Group: System/Libraries License: Apache-2.0 diff --git a/src/pims-debug.h b/src/pims-debug.h index d320160..eea2354 100644 --- a/src/pims-debug.h +++ b/src/pims-debug.h @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ #include -#define LOG_TAG "PIMS_IPC" +#define LOG_TAG "PIMS_IPC" #include #ifdef __cplusplus @@ -30,34 +30,50 @@ extern "C" { #endif -#define PIMS_VERBOSE_TAG(frmt, args...) SLOGV(frmt, ##args); -#define PIMS_DEBUG_TAG(frmt, args...) SLOGD(frmt, ##args); -#define PIMS_INFO_TAG(frmt, args...) SLOGI(frmt, ##args); -#define PIMS_WARN_TAG(frmt, args...) SLOGV(frmt, ##args); -#define PIMS_ERROR_TAG(frmt, args...) SLOGE(frmt, ##args); - +#define VERBOSE(frmt, args...) +#define DBG(frmt, args...) SLOGD(frmt, ##args) +#define INFO(frmt, args...) SLOGI(frmt, ##args) +#define WARN(frmt, args...) SLOGV(frmt, ##args) +#define ERR(frmt, args...) SLOGE(frmt, ##args) -#define ENTER() PIMS_DEBUG_TAG(">>>>>>>> called") -#define LEAVE() PIMS_DEBUG_TAG("<<<<<<<< ended") +#define FN_CALL() DBG(">>>>>>>> called") +#define FN_END() DBG("<<<<<<<< ended") -//#define VERBOSE(frmt, args...) PIMS_VERBOSE_TAG(frmt, ##args) -#define VERBOSE(frmt, args...) -#define DEBUG(frmt, args...) PIMS_DEBUG_TAG(frmt, ##args) -#define INFO(frmt, args...) PIMS_INFO_TAG(frmt, ##args) -#define WARNING(frmt, args...) PIMS_WARN_TAG(frmt, ##args) -#define ERROR(frmt, args...) PIMS_ERROR_TAG(frmt, ##args) +#define RET_IF(expr) do { \ + if (expr) { \ + ERR("(%s)", #expr); \ + return; \ + } \ +} while (0) +#define RETV_IF(expr, val) do { \ + if (expr) { \ + ERR("(%s)", #expr); \ + return (val); \ + } \ +} while (0) +#define RETM_IF(expr, fmt, arg...) do { \ + if (expr) { \ + ERR(fmt, ##arg); \ + return; \ + } \ +} while (0) +#define RETVM_IF(expr, val, fmt, arg...) do { \ + if (expr) { \ + ERR(fmt, ##arg); \ + return (val); \ + } \ +} while (0) #define WARN_IF(expr, fmt, arg...) do { \ if (expr) { \ - ERROR(fmt, ##arg); \ + ERR(fmt, ##arg); \ } \ } while (0) #define ASSERT(expr) \ - if (!(expr)) \ - { \ - ERROR("Assertion %s", #expr); \ + if (!(expr)) { \ + ERR("Assertion %s", #expr); \ } \ assert(expr) diff --git a/src/pims-internal.h b/src/pims-internal.h index 5034e63..a5f5d34 100644 --- a/src/pims-internal.h +++ b/src/pims-internal.h @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -16,10 +16,14 @@ * limitations under the License. */ - #ifndef __PIMS_INTERNAL_H__ #define __PIMS_INTERNAL_H__ +#include + +#include "pims-debug.h" + + #ifdef __cplusplus extern "C" { @@ -44,7 +48,6 @@ typedef struct { unsigned int free_size; char *pos; char *buf; - int flags; unsigned int created:1; unsigned int buf_alloced:1; } pims_ipc_data_s; diff --git a/src/pims-ipc-data-internal.h b/src/pims-ipc-data-internal.h index 329e953..f1d9b64 100644 --- a/src/pims-ipc-data-internal.h +++ b/src/pims-ipc-data-internal.h @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -16,11 +16,15 @@ * limitations under the License. */ - #ifndef __PIMS_IPC_DATA_INTERNAL_H__ #define __PIMS_IPC_DATA_INTERNAL_H__ +#include + #include +#include +#include +#include #ifdef __cplusplus extern "C" @@ -30,8 +34,51 @@ extern "C" pims_ipc_data_h pims_ipc_data_steal_unmarshal(void *buf, unsigned int size); +typedef struct { + char *service; + gid_t group; + mode_t mode; + + /* Global socket info and epoll thread */ + int sockfd; + + GHashTable *client_worker_map; /* key : client_id, value: worker_data */ + GList *client_id_fd_map; /* pims_ipc_client_map_s = client_id:client_fd */ + + int workers_max_count; + + pthread_mutex_t manager_list_hangup_mutex; + GList *manager_list_hangup; + + cynara *cynara; + pthread_mutex_t cynara_mutex; +} pims_ipc_svc_s; + +typedef struct { + char *client_id; + unsigned int client_id_len; + unsigned int seq_no; + char *call_id; + unsigned int call_id_len; + unsigned int has_data; + unsigned int data_len; + char *data; +} pims_ipc_raw_data_s; + +typedef struct { + int fd; + int client_fd; + int stop_thread; + GList *list; /* pims_ipc_raw_data_s list */ + pthread_mutex_t queue_mutex; + pthread_mutex_t ready_mutex; + pthread_cond_t ready; + pthread_mutex_t client_mutex; +} pims_ipc_worker_data_s; + #ifdef __cplusplus } #endif #endif /* __PIMS_IPC_DATA_INTERNAL_H__ */ + diff --git a/src/pims-ipc-data.c b/src/pims-ipc-data.c index 4d9662b..fbd6655 100644 --- a/src/pims-ipc-data.c +++ b/src/pims-ipc-data.c @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -25,21 +25,20 @@ #include #include "pims-internal.h" -#include "pims-debug.h" #include "pims-ipc-data.h" /* - Structure of data with type(4 bytes order) - +------------------------------------------------------------------+ - | type | size | data | pad | type | size | data | pad | - +------------------------------------------------------------------+ - 4 4 size 0-3 (Size of bytes) - - Structure of data without type(4 bytes order) - +----------------------------------------------------+ - | size | data | pad | size | data | pad | - +----------------------------------------------------+ - 4 size 0-3 (Size of bytes) + * Structure of data with type(4 bytes order) + * +------------------------------------------------------------------+ + * | type | size | data | pad | type | size | data | pad | + * +------------------------------------------------------------------+ + * 4 4 size 0-3 (Size of bytes) + * + * Structure of data without type(4 bytes order) + * +----------------------------------------------------+ + * | size | data | pad | size | data | pad | + * +----------------------------------------------------+ + * 4 size 0-3 (Size of bytes) */ #define _get_used_size_with_type(data_size) \ @@ -50,21 +49,26 @@ API pims_ipc_data_h pims_ipc_data_create_with_size(unsigned int size, int flags) { - // ENTER(); - int ret = -1; pims_ipc_data_s *handle = NULL; handle = calloc(1, sizeof(pims_ipc_data_s)); if (NULL == handle) { - ERROR("calloc() Fail"); + ERR("calloc() Fail"); return NULL; } + + if (flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE) { + ERR("Not supported with-type mode"); + free(handle); + return NULL; + } + handle->alloc_size = size; handle->free_size = size; handle->buf_size = 0; handle->buf = calloc(1, size); if (NULL == handle->buf) { - ERROR("calloc() Fail"); + ERR("calloc() Fail"); free(handle); return NULL; } @@ -72,19 +76,13 @@ API pims_ipc_data_h pims_ipc_data_create_with_size(unsigned int size, int flags) handle->created = 1; handle->buf_alloced = 1; - ret = pims_ipc_data_put(handle, &flags, sizeof(int)); - - ASSERT(ret == 0); - handle->flags = flags; - return handle; } API void pims_ipc_data_destroy(pims_ipc_data_h data) { - // ENTER(); - pims_ipc_data_s *handle = (pims_ipc_data_s *)data; - if (!handle) + pims_ipc_data_s *handle = data; + if (NULL == handle) return; if (handle->buf_alloced) @@ -95,24 +93,17 @@ API void pims_ipc_data_destroy(pims_ipc_data_h data) API int pims_ipc_data_put(pims_ipc_data_h data, void *buf, unsigned int size) { - // ENTER(); - pims_ipc_data_s *handle = NULL; + pims_ipc_data_s *handle = data; unsigned int dsize = size; unsigned int used_size = 0; - handle = (pims_ipc_data_s *)data; if (handle->created == 0) { - ERROR("This handle isn't create mode."); - return -1; - } - - if (handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE) { - ERROR("Not without-type mode"); + ERR("This handle isn't create mode."); return -1; } if (dsize > 0 && buf == NULL) { - ERROR("Invalid argument"); + ERR("Invalid argument"); return -1; } @@ -125,7 +116,7 @@ API int pims_ipc_data_put(pims_ipc_data_h data, void *buf, unsigned int size) new_size *= 2; handle->buf = realloc(handle->buf, new_size); if (NULL == handle->buf) { - ERROR("realloc() Fail"); + ERR("realloc() Fail"); return -1; } handle->alloc_size = new_size; @@ -154,30 +145,24 @@ API int pims_ipc_data_put(pims_ipc_data_h data, void *buf, unsigned int size) API void* pims_ipc_data_get(pims_ipc_data_h data, unsigned int *size) { - // ENTER(); - pims_ipc_data_s *handle = NULL; + pims_ipc_data_s *handle = data; unsigned int dsize = 0; unsigned int used_size = 0; void *buf = NULL; - handle = (pims_ipc_data_s *)data; if (handle->created == 1) { - ERROR("This handle is create mode."); + ERR("This handle is create mode."); return NULL; } if (handle->buf_size == 0) { - ERROR("Remain buffer size is 0."); - return NULL; - } - if (handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE) { - ERROR("Not without-type mode"); + ERR("Remain buffer size is 0."); return NULL; } dsize = *(int*)(handle->pos); buf = (handle->pos+sizeof(int)); - if (dsize == 0) // current position is next size field becasue data size is 0 + if (dsize == 0) /* current position is next size field becasue data size is 0 */ buf = NULL; used_size = _get_used_size(dsize); @@ -192,34 +177,30 @@ API void* pims_ipc_data_get(pims_ipc_data_h data, unsigned int *size) pims_ipc_data_h pims_ipc_data_steal_unmarshal(void *buf, unsigned int size) { - // ENTER(); - void *ptr = NULL; pims_ipc_data_s *handle = NULL; VERBOSE("size : %d", size); handle = calloc(1, sizeof(pims_ipc_data_s)); if (NULL == handle) { - ERROR("calloc() Fail"); + ERR("calloc() handle Fail"); + return NULL; + } + handle->buf = calloc(1, size + 1); + if (NULL == handle->buf) { + ERR("calloc() buf Fail"); + free(handle); return NULL; } + memcpy(handle->buf, buf, size); + handle->alloc_size = size; handle->free_size = 0; handle->buf_size = handle->alloc_size; - handle->buf = buf; handle->pos = handle->buf; handle->created = 0; handle->buf_alloced = 1; - ptr = pims_ipc_data_get(handle, &size); - if (!ptr || size != sizeof(int)) { - ERROR("pims_ipc_data_get : error"); - free(handle->buf); - free(handle); - return NULL; - } - handle->flags = *((int*)ptr); - - VERBOSE("handle[%p] flags[%x]", handle, handle->flags); + VERBOSE("handle[%p]", handle); return handle; } diff --git a/src/pims-ipc-pubsub.c b/src/pims-ipc-pubsub.c new file mode 100644 index 0000000..a87391c --- /dev/null +++ b/src/pims-ipc-pubsub.c @@ -0,0 +1,292 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include /* epoll */ + +#include + +#include "pims-internal.h" +#include "pims-ipc-data-internal.h" +#include "pims-ipc-utils.h" +#include "pims-socket.h" +#include "pims-ipc-pubsub.h" + +typedef struct { + char *service; + gid_t group; + mode_t mode; + + int publish_sockfd; + int epoll_stop_thread; + pthread_mutex_t subscribe_fds_mutex; + GList *subscribe_fds; /* client fd list */ +} pims_ipc_svc_for_publish_s; + +static pims_ipc_svc_for_publish_s *_g_singleton_for_publish = NULL; + +static void __stop_for_publish(pims_ipc_svc_for_publish_s *ipc_svc) +{ + ipc_svc->epoll_stop_thread = TRUE; +} + +static void* __publish_loop(void *user_data) +{ + int ret; + int epfd; + struct sockaddr_un addr; + struct epoll_event ev = {0}; + pims_ipc_svc_for_publish_s *ipc_svc = user_data; + + unlink(ipc_svc->service); + ipc_svc->publish_sockfd = socket(PF_UNIX, SOCK_STREAM, 0); + if (ipc_svc->publish_sockfd < 0) { + ERR("socket() Fail"); + return NULL; + } + + bzero(&addr, sizeof(addr)); + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service); + + int flags = fcntl(ipc_svc->publish_sockfd, F_GETFL, 0); + if (flags == -1) + flags = 0; + ret = fcntl(ipc_svc->publish_sockfd, F_SETFL, flags | O_NONBLOCK); + if (ret < 0) + ERR("fcntl() Fail(%d:%d)", ret, errno); + VERBOSE("publish socketfd fcntl : %d\n", ret); + + ret = bind(ipc_svc->publish_sockfd, (struct sockaddr *)&addr, sizeof(addr)); + if (ret != 0) + ERR("bind() Fail(%d)", ret); + ret = listen(ipc_svc->publish_sockfd, 30); + WARN_IF(ret != 0, "listen() Fail(%d)", ret); + + ret = chown(ipc_svc->service, getuid(), ipc_svc->group); + WARN_IF(ret != 0, "chown() Fail(%d)", ret); + ret = chmod(ipc_svc->service, ipc_svc->mode); + WARN_IF(ret != 0, "chmod() Fail(%d)", ret); + + epfd = epoll_create(MAX_EPOLL_EVENT); + + ev.events = EPOLLIN | EPOLLHUP; + ev.data.fd = ipc_svc->publish_sockfd; + + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ipc_svc->publish_sockfd, &ev); + WARN_IF(ret != 0, "epoll_ctl() Fail(%d)", ret); + + while (!ipc_svc->epoll_stop_thread) { + int i = 0; + struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, }; + int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, -1); + + if (ipc_svc->epoll_stop_thread) + break; + + if (event_num == -1) { + if (errno != EINTR) { + ERR("errno:%d\n", errno); + break; + } + } + + for (i = 0; i < event_num; i++) { + int event_fd = events[i].data.fd; + + if (events[i].events & EPOLLHUP) { + VERBOSE("client closed ----------------------------------:%d", event_fd); + if (epoll_ctl(epfd, EPOLL_CTL_DEL, event_fd, events) == -1) + ERR("epoll_ctl(EPOLL_CTL_DEL) Fail(%d)", errno); + + close(event_fd); + + /* Find client_id and delete */ + GList *cursor = NULL; + + pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex); + cursor = g_list_first(ipc_svc->subscribe_fds); + while (cursor) { + if (event_fd == (int)cursor->data) { + ipc_svc->subscribe_fds = g_list_delete_link(ipc_svc->subscribe_fds, cursor); + break; + } + cursor = g_list_next(cursor); + } + pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex); + continue; + } else if (event_fd == ipc_svc->publish_sockfd) { + /* connect client */ + struct sockaddr_un remote; + socklen_t remote_len = sizeof(remote); + int client_fd = accept(ipc_svc->publish_sockfd, (struct sockaddr *)&remote, &remote_len); + if (client_fd == -1) { + ERR("accept() Fail(%d)", errno); + continue; + } + VERBOSE("client subscriber connect: %d", client_fd); + + pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex); + ipc_svc->subscribe_fds = g_list_append(ipc_svc->subscribe_fds, GINT_TO_POINTER(client_fd)); + pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex); + + ev.events = EPOLLIN; + ev.data.fd = client_fd; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev) == -1) { + ERR("epoll_ctl(EPOLL_CTL_ADD) Fail(%d)", errno); + continue; + } + } + } + } + + close(ipc_svc->publish_sockfd); + close(epfd); + + return NULL; +} + +void pubsub_start() +{ + if (_g_singleton_for_publish) + utils_launch_thread(__publish_loop, _g_singleton_for_publish); +} + + +void pubsub_stop() +{ + if (_g_singleton_for_publish) + __stop_for_publish(_g_singleton_for_publish); +} + +API int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode) +{ + RETVM_IF(_g_singleton_for_publish, -1, "Already exist"); + + _g_singleton_for_publish = g_new0(pims_ipc_svc_for_publish_s, 1); + _g_singleton_for_publish->service = g_strdup(service); + _g_singleton_for_publish->group = group; + _g_singleton_for_publish->mode = mode; + pthread_mutex_init(&_g_singleton_for_publish->subscribe_fds_mutex, 0); + pthread_mutex_lock(&_g_singleton_for_publish->subscribe_fds_mutex); + _g_singleton_for_publish->subscribe_fds = NULL; + pthread_mutex_unlock(&_g_singleton_for_publish->subscribe_fds_mutex); + + return 0; +} + +API int pims_ipc_svc_deinit_for_publish(void) +{ + if (!_g_singleton_for_publish) + return -1; + + pthread_mutex_destroy(&_g_singleton_for_publish->subscribe_fds_mutex); + g_list_free(_g_singleton_for_publish->subscribe_fds); + + g_free(_g_singleton_for_publish->service); + g_free(_g_singleton_for_publish); + _g_singleton_for_publish = NULL; + + return 0; +} + +API int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data) +{ + gboolean is_valid = FALSE; + unsigned int call_id_len; + unsigned int is_data = FALSE; + pims_ipc_data_s *data_in = data; + gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, event); + pims_ipc_svc_for_publish_s *ipc_svc = _g_singleton_for_publish; + + if (call_id) + call_id_len = strlen(call_id); + else + call_id_len = 0; + + do { + unsigned int len, total_len; + len = sizeof(total_len) + sizeof(call_id_len) + call_id_len + sizeof(is_data); + total_len = len; + + if (data_in) { + is_data = TRUE; + len += sizeof(data_in->buf_size); + total_len = len + data_in->buf_size; + } + + int length = 0; + char buf[len+1]; + memset(buf, 0x0, len+1); + + memcpy(buf, &total_len, sizeof(total_len)); + length += sizeof(total_len); + + memcpy(buf+length, &call_id_len, sizeof(call_id_len)); + length += sizeof(call_id_len); + memcpy(buf+length, call_id, call_id_len); + length += call_id_len; + g_free(call_id); + + memcpy(buf+length, &is_data, sizeof(is_data)); + length += sizeof(is_data); + + if (is_data) { + memcpy(buf+length, &(data_in->buf_size), sizeof(data_in->buf_size)); + length += sizeof(data_in->buf_size); + } + + /* Publish to clients */ + pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex); + GList *cursor = g_list_first(ipc_svc->subscribe_fds); + int ret = 0; + while (cursor) { + int fd = GPOINTER_TO_INT(cursor->data); + pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex); + ret = socket_send(fd, buf, length); + if (ret < 0) + ERR("socket_send() Fail(%d)", ret); + + if (is_data) { + ret = socket_send_data(fd, data_in->buf, data_in->buf_size); + if (ret < 0) + ERR("socket_send_data() Fail(%d)", ret); + + } + pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex); + cursor = cursor->next; + } + pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex); + + is_valid = TRUE; + } while (0); + + if (is_valid == FALSE) + return -1; + + return 0; +} + diff --git a/src/pims-ipc-pubsub.h b/src/pims-ipc-pubsub.h new file mode 100644 index 0000000..d360746 --- /dev/null +++ b/src/pims-ipc-pubsub.h @@ -0,0 +1,26 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PIMS_PUBSUB_H__ +#define __PIMS_PUBSUB_H__ + +void pubsub_start(); +void pubsub_stop(); + +#endif /*__PIMS_PUBSUB_H__*/ + + diff --git a/src/pims-ipc-svc.c b/src/pims-ipc-svc.c index b0a9650..1d59629 100644 --- a/src/pims-ipc-svc.c +++ b/src/pims-ipc-svc.c @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -17,1848 +17,219 @@ */ #include -#include #include -#include -#include -#include #include -#include // pollfds -#include //fcntl #include -#include -#include - -#include -#include // sockaddr_un -#include // ioctl -#include // epoll -#include // eventfd -#include //socket -#include - -#include -#include -#include +#include +#include #include "pims-internal.h" -#include "pims-debug.h" #include "pims-socket.h" #include "pims-ipc-data.h" #include "pims-ipc-data-internal.h" +#include "pims-ipc-utils.h" +#include "pims-ipc-pubsub.h" +#include "pims-ipc-worker.h" #include "pims-ipc-svc.h" #define PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT 2 -typedef struct { - char *service; - gid_t group; - mode_t mode; - - // callback functions - GHashTable *cb_table; // call_id, cb_data - - // Global socket info and epoll thread - int sockfd; - bool epoll_stop_thread; - - ///////////////////////////////////////////// - // router inproc eventfd - int router; - int delay_count; // not need mutex - // epoll thread add client_fd, when receive, router read requests - GList *request_queue; // client_id lists to send request - pthread_mutex_t request_data_queue_mutex; - GHashTable *request_data_queue; // key : client id, data : GList pims_ipc_raw_data_s (client_fd, seq_no, request(command), additional data...) - // router add client when receive connecting request, remove client when disconneting request in router thread - // manager remove client when terminating client without disconnect request in router thread - GHashTable *client_worker_map; // key : client_id, worker_fd, not need mutex - GList *client_id_fd_map; // pims_ipc_client_map_s - //key :client_id(pid:seq_no), data : client_fd - - ///////////////////////////////////////////// - pthread_mutex_t task_fds_mutex; - // when starting worker thread, register fd - // when endting worker thread, deregister fd - GHashTable *task_fds; // worker_fd - worker data (worker fd, client_fd, request queue(GList), stop_thread) - int workers_max_count; - - ///////////////////////////////////////////// - // manager inproc eventfd - int manager; - // write by new worker thread, read by manager in router thread, need mutex - pthread_mutex_t manager_queue_from_worker_mutex; - GList *manager_queue_from_worker; // worker_fd => add to workers - // write in epoll thread(for dead client), read by manager in router thread, need mutex - pthread_mutex_t manager_queue_from_epoll_mutex; - GList *manager_queue_from_epoll; // cliend_fd => find worker_fd => add to idle workers - // managed by manager, router find idle worker when connecting new client in router thread => remove from idle workers - GList *workers; // worker_fd list, not need mutex - ///////////////////////////////////////////// - cynara *cynara; - pthread_mutex_t cynara_mutex; - - int unique_sequence_number; - pthread_mutex_t client_info_mutex; - GHashTable *worker_client_info_map; // key : worker_id, data : pims_ipc_client_info_s* - GHashTable *client_info_map; // key : client_id, data : pims_ipc_client_info_s* -} pims_ipc_svc_s; - -typedef struct { - char *smack; - char *uid; - char *client_session; -} pims_ipc_client_info_s ; - -typedef struct { - char *service; - gid_t group; - mode_t mode; - - int publish_sockfd; - bool epoll_stop_thread; - pthread_mutex_t subscribe_fds_mutex; - GList *subscribe_fds; // cliend fd list -} pims_ipc_svc_for_publish_s; - -typedef struct { - int fd; - char *id; -}pims_ipc_client_map_s; - -typedef struct { - pims_ipc_svc_call_cb callback; - void * user_data; -} pims_ipc_svc_cb_s; - -typedef struct { - pims_ipc_svc_client_disconnected_cb callback; - void * user_data; -} pims_ipc_svc_client_disconnected_cb_t; - -typedef struct { - int fd; - int worker_id; // pthrad_self() - int client_fd; - bool stop_thread; - GList *queue; // pims_ipc_raw_data_s list - pthread_mutex_t queue_mutex; -} pims_ipc_worker_data_s; - -typedef struct{ - char *client_id; - unsigned int client_id_len; - unsigned int seq_no; - char *call_id; - unsigned int call_id_len; - unsigned int is_data; - unsigned int data_len; - char *data; -}pims_ipc_raw_data_s; - -typedef struct { - int client_fd; - int request_count; - GList *raw_data; // pims_ipc_raw_data_s list - pthread_mutex_t raw_data_mutex; -}pims_ipc_request_s; - static pims_ipc_svc_s *_g_singleton = NULL; -static pims_ipc_svc_for_publish_s *_g_singleton_for_publish = NULL; - -static __thread pims_ipc_svc_client_disconnected_cb_t _client_disconnected_cb = {NULL, NULL}; - -static void __free_raw_data(pims_ipc_raw_data_s *data) -{ - if (!data) return; - - free(data->client_id); - free(data->call_id); - free(data->data); - free(data); -} -static void __worker_data_free(gpointer data) +gboolean svc_map_client_worker(gpointer user_data) { - pims_ipc_worker_data_s *worker_data = (pims_ipc_worker_data_s*)data; + char *client_id = user_data; + pims_ipc_worker_data_s *worker_data; - pthread_mutex_lock(&worker_data->queue_mutex); - if (worker_data->queue) { - GList *cursor = g_list_first(worker_data->queue); - while(cursor) { - GList *l = cursor; - pims_ipc_raw_data_s *data = l->data; - cursor = g_list_next(cursor); - worker_data->queue = g_list_remove_link(worker_data->queue, l); - g_list_free(l); - __free_raw_data(data); - } + worker_data = worker_get_idle_worker(_g_singleton, client_id); + if (NULL == worker_data) { + ERR("worker_get_idle_worker() Fail"); + return G_SOURCE_CONTINUE; } - pthread_mutex_unlock(&worker_data->queue_mutex); - free(worker_data); + return G_SOURCE_REMOVE; } -static void _destroy_client_info(gpointer p) +API int pims_ipc_svc_init(char *service, gid_t group, mode_t mode) { - pims_ipc_client_info_s *client_info = p; + RETVM_IF(_g_singleton, -1, "Already exist"); - if (NULL == client_info) - return; - free(client_info->smack); - free(client_info->uid); - free(client_info->client_session); - free(client_info); -} + int ret = 0; -API int pims_ipc_svc_init(char *service, gid_t group, mode_t mode) -{ - if (_g_singleton) { - ERROR("Already exist"); + _g_singleton = g_new0(pims_ipc_svc_s, 1); + if (NULL == _g_singleton) { + ERR("g_new0() Fail"); return -1; } - _g_singleton = g_new0(pims_ipc_svc_s, 1); - ASSERT(_g_singleton); + ret = cynara_initialize(&_g_singleton->cynara, NULL); + if (CYNARA_API_SUCCESS != ret) { + char errmsg[1024] = {0}; + cynara_strerror(ret, errmsg, sizeof(errmsg)); + ERR("cynara_initialize() Fail(%d,%s)", ret, errmsg); + return -1; + } _g_singleton->service = g_strdup(service); _g_singleton->group = group; _g_singleton->mode = mode; _g_singleton->workers_max_count = PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT; - _g_singleton->cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free); - ASSERT(_g_singleton->cb_table); - pthread_mutex_init(&_g_singleton->request_data_queue_mutex, 0); - _g_singleton->request_queue = NULL; - _g_singleton->request_data_queue = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); // client_id - pims_ipc_raw_data_s - ASSERT(_g_singleton->request_data_queue); - _g_singleton->client_worker_map = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); // client id - worker_fd mapping + /* client id - worker_fd mapping */ + _g_singleton->client_worker_map = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); ASSERT(_g_singleton->client_worker_map); - _g_singleton->delay_count = 0; - - pthread_mutex_init(&_g_singleton->task_fds_mutex, 0); - _g_singleton->task_fds = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __worker_data_free); // pims_ipc_worker_data_s - ASSERT(_g_singleton->task_fds); - - pthread_mutex_init(&_g_singleton->manager_queue_from_epoll_mutex, 0); - _g_singleton->manager_queue_from_epoll = NULL; - - pthread_mutex_init(&_g_singleton->manager_queue_from_worker_mutex, 0); - _g_singleton->manager_queue_from_worker = NULL; - _g_singleton->workers = NULL; - _g_singleton->unique_sequence_number = 0; + pthread_mutex_init(&_g_singleton->manager_list_hangup_mutex, 0); + _g_singleton->manager_list_hangup = NULL; - _g_singleton->worker_client_info_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, _destroy_client_info); - _g_singleton->client_info_map = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, _destroy_client_info); - pthread_mutex_init(&_g_singleton->client_info_mutex, 0); + worker_init(); + client_init(); pthread_mutex_init(&_g_singleton->cynara_mutex, 0); - _g_singleton->epoll_stop_thread = false; + return 0; +} - int ret = cynara_initialize(&_g_singleton->cynara, NULL); - if (CYNARA_API_SUCCESS != ret) { - char errmsg[1024] = {0}; - cynara_strerror(ret, errmsg, sizeof(errmsg)); - ERROR("cynara_initialize() Fail(%d,%s)", ret, errmsg); - return -1; +static void __remove_client_fd_map(pims_ipc_svc_s *ipc_svc) +{ + GList *next, *cursor; + pims_ipc_client_map_s *client; + + cursor = g_list_first(ipc_svc->client_id_fd_map); + while (cursor) { + next = cursor->next; + client = cursor->data; + worker_stop_client_worker(ipc_svc, client->id); + free(client->id); + free(client); + cursor = next; } - return 0; + g_list_free(ipc_svc->client_id_fd_map); } API int pims_ipc_svc_deinit(void) { - if (!_g_singleton) - return -1; + RETV_IF(NULL == _g_singleton, -1); g_free(_g_singleton->service); - g_hash_table_destroy(_g_singleton->cb_table); - - pthread_mutex_destroy(&_g_singleton->request_data_queue_mutex); - g_hash_table_destroy(_g_singleton->client_worker_map); - g_hash_table_destroy(_g_singleton->request_data_queue); - g_list_free_full(_g_singleton->request_queue, g_free); - - pthread_mutex_destroy(&_g_singleton->task_fds_mutex); - g_hash_table_destroy(_g_singleton->task_fds); - - pthread_mutex_destroy(&_g_singleton->manager_queue_from_epoll_mutex); - g_list_free_full(_g_singleton->manager_queue_from_epoll, g_free); - pthread_mutex_destroy(&_g_singleton->manager_queue_from_worker_mutex); - g_list_free(_g_singleton->manager_queue_from_worker); - GList *cursor = g_list_first(_g_singleton->client_id_fd_map); - while(cursor) { - pims_ipc_client_map_s *client = cursor->data; - _g_singleton->client_id_fd_map = g_list_remove_link(_g_singleton->client_id_fd_map, cursor); //free(client_id); - free(client->id); - free(client); - g_list_free(cursor); - cursor = g_list_first(_g_singleton->client_id_fd_map); - } - g_list_free(_g_singleton->client_id_fd_map); + pthread_mutex_destroy(&_g_singleton->manager_list_hangup_mutex); + g_list_free_full(_g_singleton->manager_list_hangup, g_free); - pthread_mutex_destroy(&_g_singleton->client_info_mutex); - g_hash_table_destroy(_g_singleton->worker_client_info_map); - g_hash_table_destroy(_g_singleton->client_info_map); + __remove_client_fd_map(_g_singleton); + worker_deinit(); + g_hash_table_destroy(_g_singleton->client_worker_map); + client_deinit(); pthread_mutex_lock(&_g_singleton->cynara_mutex); int ret = cynara_finish(_g_singleton->cynara); if (CYNARA_API_SUCCESS != ret) { char errmsg[1024] = {0}; cynara_strerror(ret, errmsg, sizeof(errmsg)); - ERROR("cynara_finish() Fail(%d,%s)", ret, errmsg); + ERR("cynara_finish() Fail(%d,%s)", ret, errmsg); } pthread_mutex_unlock(&_g_singleton->cynara_mutex); pthread_mutex_destroy(&_g_singleton->cynara_mutex); - g_list_free(_g_singleton->workers); g_free(_g_singleton); _g_singleton = NULL; return 0; } -API int pims_ipc_svc_register(char *module, char *function, pims_ipc_svc_call_cb callback, void *userdata) +API int pims_ipc_svc_register(char *module, char *function, + pims_ipc_svc_call_cb callback, void *userdata) { - pims_ipc_svc_cb_s *cb_data = NULL; gchar *call_id = NULL; + pims_ipc_svc_cb_s *cb_data = NULL; - if (!module || !function || !callback) { - ERROR("Invalid argument"); + RETV_IF(NULL == module, -1); + RETV_IF(NULL == function, -1); + RETV_IF(NULL == callback, -1); + + cb_data = calloc(1, sizeof(pims_ipc_svc_cb_s)); + if (NULL == cb_data) { + ERR("calloc() Fail"); return -1; } - cb_data = g_new0(pims_ipc_svc_cb_s, 1); - call_id = PIMS_IPC_MAKE_CALL_ID(module, function); + call_id = PIMS_IPC_MAKE_CALL_ID(module, function); VERBOSE("register cb id[%s]", call_id); + cb_data->callback = callback; cb_data->user_data = userdata; - g_hash_table_insert(_g_singleton->cb_table, call_id, cb_data); - - return 0; -} - -API int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode) -{ - if (_g_singleton_for_publish) { - ERROR("Already exist"); - return -1; - } - - _g_singleton_for_publish = g_new0(pims_ipc_svc_for_publish_s, 1); - _g_singleton_for_publish->service = g_strdup(service); - _g_singleton_for_publish->group = group; - _g_singleton_for_publish->mode = mode; - _g_singleton_for_publish->subscribe_fds = NULL; - - pthread_mutex_init(&_g_singleton_for_publish->subscribe_fds_mutex, 0); - - return 0; -} - -API int pims_ipc_svc_deinit_for_publish(void) -{ - if (!_g_singleton_for_publish) - return -1; - - pthread_mutex_destroy(&_g_singleton_for_publish->subscribe_fds_mutex); - g_list_free(_g_singleton_for_publish->subscribe_fds); - - g_free(_g_singleton_for_publish->service); - g_free(_g_singleton_for_publish); - _g_singleton_for_publish = NULL; - - return 0; -} - -API int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data) -{ - pims_ipc_svc_for_publish_s *ipc_svc = _g_singleton_for_publish; - gboolean is_valid = FALSE; - gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, event); - pims_ipc_data_s *data_in = (pims_ipc_data_s*)data; - unsigned int call_id_len = strlen(call_id); - unsigned int is_data = FALSE; - do { - // make publish data - unsigned int len = sizeof(unsigned int) // total size - + call_id_len + sizeof(unsigned int) // call_id - + sizeof(unsigned int); // is data - unsigned int total_len = len; + worker_set_callback(call_id, cb_data); - if (data_in) { - is_data = TRUE; - len += sizeof(unsigned int); - total_len = len + data_in->buf_size; // data - } - - char buf[len+1]; - int length = 0; - memset(buf, 0x0, len+1); - - // total_size - memcpy(buf, (void*)&total_len, sizeof(unsigned int)); - length += sizeof(unsigned int); - - // call_id - memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int)); - length += sizeof(unsigned int); - memcpy(buf+length, (void*)(call_id), call_id_len); - length += call_id_len; - g_free(call_id); - - // is_data - memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int)); - length += sizeof(unsigned int); - - // data - if (is_data) { - memcpy(buf+length, (void*)&(data_in->buf_size), sizeof(unsigned int)); - length += sizeof(unsigned int); - } - - // Publish to clients - pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex); - GList *cursor = g_list_first(ipc_svc->subscribe_fds); - int ret = 0; - while(cursor) { - int fd = (int)cursor->data; - pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex); - ret = socket_send(fd, buf, length); - if (ret < 0) { - ERROR("socket_send publish error : %d", ret); - } - - if (is_data) { - ret = socket_send_data(fd, data_in->buf, data_in->buf_size); - if (ret < 0) { - ERROR("socket_send_data publish error : %d", ret); - } - } - pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex); - cursor = g_list_next(cursor); - } - pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex); - - is_valid = TRUE; - } while (0); - - if (is_valid == FALSE) - return -1; return 0; } -static void __run_callback(int worker_id, char *call_id, pims_ipc_data_h dhandle_in, pims_ipc_data_h *dhandle_out) -{ - pims_ipc_svc_cb_s *cb_data = NULL; - - VERBOSE("Call id [%s]", call_id); - - cb_data = (pims_ipc_svc_cb_s*)g_hash_table_lookup(_g_singleton->cb_table, call_id); - if (cb_data == NULL) { - VERBOSE("unable to find %s", call_id); - return; - } - - cb_data->callback((pims_ipc_h)worker_id, dhandle_in, dhandle_out, cb_data->user_data); -} - -static void __make_raw_data(const char *call_id, int seq_no, pims_ipc_data_h data, pims_ipc_raw_data_s **out) -{ - if (NULL == out) { - ERROR("Invalid parameter:out is NULL"); - return; - } - - pims_ipc_raw_data_s *raw_data = NULL; - raw_data = (pims_ipc_raw_data_s*)calloc(1, sizeof(pims_ipc_raw_data_s)); - if (NULL == raw_data) { - ERROR("calloc() Fail"); - return; - } - pims_ipc_data_s *data_in = (pims_ipc_data_s*)data; - - raw_data->call_id = strdup(call_id); - raw_data->call_id_len = strlen(raw_data->call_id); - raw_data->seq_no = seq_no; - - if (data_in && data_in->buf_size > 0) { - raw_data->is_data = TRUE; - raw_data->data = calloc(1, data_in->buf_size+1); - if (NULL == raw_data->data) { - ERROR("calloc() Fail"); - free(raw_data->call_id); - free(raw_data); - return; - } - memcpy(raw_data->data, data_in->buf, data_in->buf_size); - raw_data->data_len = data_in->buf_size; - } - else { - raw_data->is_data = FALSE; - raw_data->data_len = 0; - raw_data->data = NULL; - } - *out = raw_data; - return; -} - -static int __send_raw_data(int fd, const char *client_id, pims_ipc_raw_data_s *data) +API void pims_ipc_svc_run_main_loop(GMainLoop *loop) { - int ret = 0; - unsigned int client_id_len = strlen(client_id); - - if (!data) { - INFO("No data to send NULL\n"); - return -1; - } - - unsigned int len = sizeof(unsigned int) // total size - + client_id_len + sizeof(unsigned int) // client_id - + sizeof(unsigned int) // seq_no - + data->call_id_len + sizeof(unsigned int) // call_id - + sizeof(unsigned int); // is data - unsigned int total_len = len; - - if (data->is_data) { - len += sizeof(unsigned int); // data - total_len = len + data->data_len; // data - } - - INFO("client_id: %s, call_id : %s, seq no :%d, len:%d, total len :%d", client_id, data->call_id, data->seq_no, len, total_len); - - char buf[len+1]; - - int length = 0; - memset(buf, 0x0, len+1); - - // total_len - memcpy(buf, (void*)&total_len, sizeof(unsigned int)); - length += sizeof(unsigned int); - - // client_id - memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int)); - length += sizeof(unsigned int); - memcpy(buf+length, (void*)(client_id), client_id_len); - length += client_id_len; - - // seq_no - memcpy(buf+length, (void*)&(data->seq_no), sizeof(unsigned int)); - length += sizeof(unsigned int); - - // call id - memcpy(buf+length, (void*)&(data->call_id_len), sizeof(unsigned int)); - length += sizeof(unsigned int); - memcpy(buf+length, (void*)(data->call_id), data->call_id_len); - length += data->call_id_len; - - // is_data - memcpy(buf+length, (void*)&(data->is_data), sizeof(unsigned int)); - length += sizeof(unsigned int); - - if (data->is_data) { - memcpy(buf+length, (void*)&(data->data_len), sizeof(unsigned int)); - length += sizeof(unsigned int); - ret = socket_send(fd, buf, length); + GMainLoop *main_loop = loop; - // send data - if (ret > 0) - ret += socket_send_data(fd, data->data, data->data_len); - } - else - ret = socket_send(fd, buf, length); - - return ret; -} + if (main_loop == NULL) + main_loop = g_main_loop_new(NULL, FALSE); -static gboolean __worker_raw_data_pop(pims_ipc_worker_data_s *worker, pims_ipc_raw_data_s **data) -{ - if (!worker) - return FALSE; + pubsub_start(); - pthread_mutex_lock(&worker->queue_mutex); - if (!worker->queue) { - pthread_mutex_unlock(&worker->queue_mutex); - *data = NULL; - return FALSE; + if (_g_singleton) { + /* launch worker threads in advance */ + worker_start_idle_worker(_g_singleton); + socket_set_handler(_g_singleton); } - *data = g_list_first(worker->queue)->data; - worker->queue = g_list_delete_link(worker->queue, g_list_first(worker->queue)); - pthread_mutex_unlock(&worker->queue_mutex); + g_main_loop_run(main_loop); - return TRUE; + worker_stop_idle_worker(); + pubsub_stop(); } -static void* __worker_loop(void *data) +API bool pims_ipc_svc_check_privilege(pims_ipc_h ipc, char *privilege) { int ret; - int worker_id; - int worker_fd; - pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data; - pims_ipc_worker_data_s *worker_data; - bool disconnected = false; - - worker_fd = eventfd(0, 0); - if (worker_fd == -1) - return NULL; - worker_id = (int)pthread_self(); - - worker_data = calloc(1, sizeof(pims_ipc_worker_data_s)); - if (NULL == worker_data) { - ERROR("calloc() Fail"); - close(worker_fd); - return NULL; - } - worker_data->fd = worker_fd; - worker_data->worker_id = worker_id; - worker_data->client_fd = -1; - worker_data->stop_thread = false; - pthread_mutex_init(&worker_data->queue_mutex, 0); - - pthread_mutex_lock(&ipc_svc->task_fds_mutex); - g_hash_table_insert(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd), worker_data); - pthread_mutex_unlock(&ipc_svc->task_fds_mutex); - - pthread_mutex_lock(&ipc_svc->manager_queue_from_worker_mutex); - ipc_svc->manager_queue_from_worker = g_list_append(ipc_svc->manager_queue_from_worker, (void*)worker_fd); - pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex); - - write_command(ipc_svc->manager, 1); - DEBUG("worker register to manager : worker_id(%08x00), worker_fd(%d)\n", worker_id, worker_fd); - - struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof(struct pollfd)); - if (NULL == pollfds) { - ERROR("calloc() Fail"); - g_hash_table_remove(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd)); - free(worker_data); - close(worker_fd); - return NULL; - } - pollfds[0].fd = worker_fd; - pollfds[0].events = POLLIN; - - while (!worker_data->stop_thread) { - while(1) { - if (worker_data->stop_thread) - break; - ret = poll(pollfds, 1, 3000); // waiting command from router - if (ret == -1 && errno == EINTR) { - continue; - } - break; - } - - if (worker_data->stop_thread) - break; - - if (ret > 0) { - pims_ipc_raw_data_s *raw_data = NULL; - pims_ipc_raw_data_s *result = NULL; - - if (pollfds[0].revents & POLLIN) { - uint64_t dummy; - read_command(pollfds[0].fd, &dummy); - if (__worker_raw_data_pop(worker_data, &raw_data)) { - pims_ipc_data_h data_in = NULL; - pims_ipc_data_h data_out = NULL; - if (strcmp(PIMS_IPC_CALL_ID_CREATE, raw_data->call_id) == 0) { - - } - else if (strcmp(PIMS_IPC_CALL_ID_DESTROY, raw_data->call_id) == 0) { - disconnected = true; - } - else { - data_in = pims_ipc_data_steal_unmarshal(raw_data->data, raw_data->data_len); - raw_data->data = NULL; - raw_data->data_len = 0; - raw_data->is_data = false; - __run_callback(worker_id, raw_data->call_id, data_in, &data_out); - pims_ipc_data_destroy(data_in); - } - - if (data_out) { - __make_raw_data(raw_data->call_id, raw_data->seq_no, data_out, &result); - pims_ipc_data_destroy(data_out); - } - else - __make_raw_data(raw_data->call_id, raw_data->seq_no, NULL, &result); - - if (worker_data->client_fd != -1) - __send_raw_data(worker_data->client_fd, raw_data->client_id, result); - __free_raw_data(raw_data); - __free_raw_data(result); - } - } - } - } - - if (!disconnected) - ERROR("client fd closed, worker_fd : %d", worker_fd); - INFO("task thread terminated --------------------------- (worker_fd : %d)", worker_fd); - - pthread_mutex_lock(&ipc_svc->client_info_mutex); - g_hash_table_remove(ipc_svc->worker_client_info_map, GINT_TO_POINTER(worker_id)); - pthread_mutex_unlock(&ipc_svc->client_info_mutex); - - pthread_mutex_lock(&ipc_svc->task_fds_mutex); - g_hash_table_remove(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd)); // __worker_data_free will be called - pthread_mutex_unlock(&ipc_svc->task_fds_mutex); - - close(worker_fd); - free ((void*)pollfds); - - if (_client_disconnected_cb.callback) - _client_disconnected_cb.callback((pims_ipc_h)worker_id, _client_disconnected_cb.user_data); - - return NULL; -} - -static void __launch_thread(void *(*start_routine) (void *), void *data) -{ - int ret = 0; - - pthread_t worker; - pthread_attr_t attr; - - // set kernel thread - ret = pthread_attr_init(&attr); - if (0 != ret) { - ERROR("pthread_attr_init() Fail(%d)", ret); - return; - } - ret = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); - if (0 != ret) { - ERROR("pthread_attr_setscope() Fail(%d)", ret); - return; - } - ret = pthread_create(&worker, &attr, start_routine, data); - if (0 != ret) { - ERROR("pthread_create() Fail(%d)", ret); - return; - } - pthread_detach(worker); -} - -static gboolean __is_worker_available() -{ - if (_g_singleton->workers) - return TRUE; - else - return FALSE; -} - -static int __get_worker(const char *client_id, int *worker_fd) -{ - ASSERT(client_id); - ASSERT(worker_fd); - - if (!__is_worker_available()) { - ERROR("There is no idle worker"); - return -1; - } - *worker_fd = (int)(g_list_first(_g_singleton->workers)->data); - _g_singleton->workers = g_list_delete_link(_g_singleton->workers, - g_list_first(_g_singleton->workers)); - - g_hash_table_insert(_g_singleton->client_worker_map, g_strdup(client_id), GINT_TO_POINTER(*worker_fd)); - - return 0; -} - -static int __find_worker(const char *client_id, int *worker_fd) -{ - char *orig_pid = NULL; - int fd; - - ASSERT(client_id); - ASSERT(worker_fd); - - if (FALSE == g_hash_table_lookup_extended(_g_singleton->client_worker_map, client_id, - (gpointer*)&orig_pid, (gpointer*)&fd)) { - VERBOSE("unable to find worker id for %s", client_id); - return -1; - } - - *worker_fd = GPOINTER_TO_INT(fd); - return 0; -} - -static bool __request_pop(pims_ipc_request_s *data_queue, pims_ipc_raw_data_s **data) -{ - bool ret = false; - GList *cursor; + int pid = (int)ipc; - pthread_mutex_lock(&data_queue->raw_data_mutex); - cursor = g_list_first(data_queue->raw_data); - if (cursor) { - *data = cursor->data; - data_queue->raw_data = g_list_delete_link(data_queue->raw_data, cursor); - (data_queue->request_count)--; + RETV_IF(NULL == privilege, false); - ret = true; + pims_ipc_client_info_s *client_info = NULL; + client_info = client_get_info(pid); + if (NULL == client_info) { + ERR("client_info is NULL"); + return false; } - else - *data = NULL; + pims_ipc_client_info_s *client_clone = NULL; + client_clone = client_clone_info(client_info); - pthread_mutex_unlock(&data_queue->raw_data_mutex); - return ret; -} + pthread_mutex_lock(&_g_singleton->cynara_mutex); + ret = cynara_check(_g_singleton->cynara, client_clone->smack, + client_clone->client_session, client_clone->uid, privilege); + pthread_mutex_unlock(&_g_singleton->cynara_mutex); -static bool __worker_raw_data_push(pims_ipc_worker_data_s *worker_data, int client_fd, pims_ipc_raw_data_s *data) -{ - pthread_mutex_lock(&worker_data->queue_mutex); - worker_data->queue = g_list_append(worker_data->queue, data); - worker_data->client_fd = client_fd; - pthread_mutex_unlock(&worker_data->queue_mutex); + client_destroy_info(client_clone); - return true; -} + if (CYNARA_API_ACCESS_ALLOWED == ret) + return true; -static int _find_worker_id(pims_ipc_svc_s *ipc_svc, int worker_fd) -{ - int worker_id = 0; - pims_ipc_worker_data_s *worker_data = NULL; - pthread_mutex_lock(&ipc_svc->task_fds_mutex); - worker_data = g_hash_table_lookup(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd)); - if (NULL == worker_data) { - ERROR("g_hash_table_lookup(%d) return NULL", worker_fd); - pthread_mutex_unlock(&ipc_svc->task_fds_mutex); - return -1; - } - worker_id = worker_data->worker_id; - pthread_mutex_unlock(&ipc_svc->task_fds_mutex); - return worker_id; + return false; } -static int _create_client_info(int fd, pims_ipc_client_info_s **p_client_info) +API int pims_ipc_svc_get_smack_label(pims_ipc_h ipc, char **p_smack) { - int ret; - pid_t pid; - char errmsg[1024] = {0}; + pims_ipc_client_info_s *client_info = NULL; + int pid = (int)ipc; - pims_ipc_client_info_s *client_info = calloc(1, sizeof(pims_ipc_client_info_s)); + client_info = client_get_info(pid); if (NULL == client_info) { - ERROR("calloc() return NULL"); - return -1; - } - - ret = cynara_creds_socket_get_client(fd, CLIENT_METHOD_SMACK, &(client_info->smack)); - if (CYNARA_API_SUCCESS != ret) { - cynara_strerror(ret, errmsg, sizeof(errmsg)); - ERROR("cynara_creds_socket_get_client() Fail(%d,%s)", ret, errmsg); - _destroy_client_info(client_info); - return -1; - } - - ret = cynara_creds_socket_get_user(fd, USER_METHOD_UID, &(client_info->uid)); - if (CYNARA_API_SUCCESS != ret) { - cynara_strerror(ret, errmsg, sizeof(errmsg)); - ERROR("cynara_creds_socket_get_user() Fail(%d,%s)", ret, errmsg); - _destroy_client_info(client_info); - return -1; - } - - ret = cynara_creds_socket_get_pid(fd, &pid); - if (CYNARA_API_SUCCESS != ret) { - cynara_strerror(ret, errmsg, sizeof(errmsg)); - ERROR("cynara_creds_socket_get_pid() Fail(%d,%s)", ret, errmsg); - _destroy_client_info(client_info); - return -1; + ERR("client_info is NULL"); + return false; } - client_info->client_session = cynara_session_from_pid(pid); - if (NULL == client_info->client_session) { - ERROR("cynara_session_from_pid() return NULL"); - _destroy_client_info(client_info); - return -1; + if (client_info->smack) { + *p_smack = strdup(client_info->smack); + if (NULL == *p_smack) { + ERR("strdup() return NULL"); + return -1; + } } - *p_client_info = client_info; - - return 0; -} - -static pims_ipc_client_info_s* _clone_client_info(pims_ipc_client_info_s *client_info) -{ - if (NULL == client_info) { - ERROR("client_info is NULL"); - return NULL; - } - - pims_ipc_client_info_s *clone = calloc(1, sizeof(pims_ipc_client_info_s)); - if (NULL == clone) { - ERROR("calloc() Fail"); - return NULL; - } - - if (client_info->smack) { - clone->smack = strdup(client_info->smack); - if (NULL == clone->smack) { - ERROR("strdup() Fail"); - _destroy_client_info(clone); - return NULL; - } - } - - if (client_info->uid) { - clone->uid = strdup(client_info->uid); - if (NULL == clone->uid) { - ERROR("strdup() Fail"); - _destroy_client_info(clone); - return NULL; - } - } - - if (client_info->client_session) { - clone->client_session = strdup(client_info->client_session); - if (NULL == clone->client_session) { - ERROR("strdup() Fail"); - _destroy_client_info(clone); - return NULL; - } - } - - return clone; -} - - -static int __process_router_event(pims_ipc_svc_s *ipc_svc, gboolean for_queue) -{ - gboolean is_valid = FALSE; - pims_ipc_request_s *data_queue = NULL; - GList *queue_cursor = NULL; - int worker_fd = 0; - char *client_id = NULL; - int *org_fd; - int ret; - - do { - pthread_mutex_lock(&ipc_svc->request_data_queue_mutex); - queue_cursor = g_list_first(ipc_svc->request_queue); - if (NULL == queue_cursor) { - pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex); - return 0; - } - client_id = (char *)(queue_cursor->data); - ASSERT(client_id != NULL); - pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex); - - ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd, (gpointer*)&data_queue); - - if (for_queue) - ipc_svc->delay_count--; - - if (ret == TRUE && data_queue) { - int *org_fd; - pims_ipc_worker_data_s *worker_data = NULL; - - pthread_mutex_lock(&data_queue->raw_data_mutex); - GList *cursor = g_list_first(data_queue->raw_data); - if (!cursor) { - pthread_mutex_unlock(&data_queue->raw_data_mutex); - break; - } - - pims_ipc_raw_data_s *data = (pims_ipc_raw_data_s*)(cursor->data); - if (NULL == data) { - ERROR("data is NULL"); - pthread_mutex_unlock(&data_queue->raw_data_mutex); - break; - } - char *call_id = data->call_id; - int client_fd = data_queue->client_fd; - - ASSERT(call_id != NULL); - - VERBOSE("call_id = [%s]", call_id); - if (strcmp(PIMS_IPC_CALL_ID_CREATE, call_id) == 0) { - // Get a worker. If cannot get a worker, create a worker and enqueue a current request - __launch_thread(__worker_loop, ipc_svc); - if (__get_worker((const char*)client_id, &worker_fd) != 0) { - ipc_svc->delay_count++; - pthread_mutex_unlock(&data_queue->raw_data_mutex); - is_valid = TRUE; - break; - } - int worker_id = _find_worker_id(ipc_svc, worker_fd); - pthread_mutex_lock(&ipc_svc->client_info_mutex); - pims_ipc_client_info_s *client_info = g_hash_table_lookup(ipc_svc->client_info_map, client_id); - pims_ipc_client_info_s *client_info_clone = _clone_client_info(client_info); - g_hash_table_insert(ipc_svc->worker_client_info_map, GINT_TO_POINTER(worker_id), client_info_clone); - pthread_mutex_unlock(&ipc_svc->client_info_mutex); - } - else { - // Find a worker - if (__find_worker((const char*)client_id, &worker_fd) != 0) { - ERROR("unable to find a worker"); - pthread_mutex_unlock(&data_queue->raw_data_mutex); - break; - } - } - pthread_mutex_unlock(&data_queue->raw_data_mutex); - - VERBOSE("routing client_id : %s, seq_no: %d, client_fd = %d, worker fd = %d", client_id, data->seq_no, client_fd, worker_fd); - - if (worker_fd <= 0) - break; - - pthread_mutex_lock(&ipc_svc->task_fds_mutex); - if (FALSE == g_hash_table_lookup_extended(ipc_svc->task_fds, - GINT_TO_POINTER(worker_fd), (gpointer*)&org_fd, (gpointer*)&worker_data)) { - ERROR("hash lookup fail : worker_fd (%d)", worker_fd); - pthread_mutex_unlock(&ipc_svc->task_fds_mutex); - break; - } - - if (__request_pop(data_queue, &data)) { - __worker_raw_data_push(worker_data, client_fd, data); - write_command(worker_fd, 1); - } - - pthread_mutex_unlock(&ipc_svc->task_fds_mutex); - } - - pthread_mutex_lock(&ipc_svc->request_data_queue_mutex); - free(client_id); - ipc_svc->request_queue = g_list_delete_link(ipc_svc->request_queue, queue_cursor); - pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex); - - is_valid = TRUE; - } while (0); - - if (is_valid == FALSE) - return -1; - - return 1; -} - -static int __process_manager_event(pims_ipc_svc_s *ipc_svc) -{ - GList *cursor = NULL; - int worker_fd; - - // client socket terminated without disconnect request - pthread_mutex_lock(&ipc_svc->manager_queue_from_epoll_mutex); - if (ipc_svc->manager_queue_from_epoll) { - cursor = g_list_first(ipc_svc->manager_queue_from_epoll); - char *client_id = (char*)cursor->data; - __find_worker(client_id, &worker_fd); - - ipc_svc->manager_queue_from_epoll = g_list_delete_link(ipc_svc->manager_queue_from_epoll, cursor); - pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex); - - // remove client_fd - g_hash_table_remove(ipc_svc->client_worker_map, client_id); - free(client_id); - - // stop worker thread - if (worker_fd) { - int *org_fd; - pims_ipc_worker_data_s *worker_data; - - pthread_mutex_lock(&ipc_svc->task_fds_mutex); - if (FALSE == g_hash_table_lookup_extended(ipc_svc->task_fds, - GINT_TO_POINTER(worker_fd), (gpointer*)&org_fd, (gpointer*)&worker_data)) { - ERROR("g_hash_table_lookup_extended fail : worker_fd (%d)", worker_fd); - pthread_mutex_unlock(&ipc_svc->task_fds_mutex); - return -1; - } - worker_data->stop_thread = true; - worker_data->client_fd = -1; - pthread_mutex_unlock(&ipc_svc->task_fds_mutex); - - write_command(worker_fd, 1); - VERBOSE("write command to worker terminate (worker_fd : %d)", worker_fd); - } - return 0; - } - pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex); - - // create new worker - pthread_mutex_lock(&ipc_svc->manager_queue_from_worker_mutex); - if (ipc_svc->manager_queue_from_worker) { - - cursor = g_list_first(ipc_svc->manager_queue_from_worker); - while (cursor) { - worker_fd = (int)cursor->data; - ipc_svc->manager_queue_from_worker = g_list_delete_link(ipc_svc->manager_queue_from_worker, cursor); - - if (worker_fd) { - DEBUG("add idle worker_fd : %d", worker_fd); - ipc_svc->workers = g_list_append(ipc_svc->workers, (void*)worker_fd); - } - cursor = g_list_first(ipc_svc->manager_queue_from_worker); - } - pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex); - return 0; - } - pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex); - - return 0; -} - -// if delete = true, steal client_id, then free(client_id) -// if delete = false, return client_id pointer, then do no call free(client_id -static int __find_client_id(pims_ipc_svc_s *ipc_svc, int client_fd, bool delete, char **client_id) -{ - pims_ipc_client_map_s *client; - GList *cursor = NULL; - cursor = g_list_first(ipc_svc->client_id_fd_map); - while(cursor) { - client = cursor->data; - if (client->fd == client_fd) { - *client_id = client->id; - if (delete) { - client->id = NULL; - ipc_svc->client_id_fd_map = g_list_delete_link(ipc_svc->client_id_fd_map, cursor); //free(client); - free(client); - } - return 0; - } - cursor = g_list_next(cursor); - } - return -1; -} - -static void __request_push(pims_ipc_svc_s *ipc_svc, char *client_id, int client_fd, pims_ipc_raw_data_s *data) -{ - int ret; - int *org_fd; - pims_ipc_request_s *data_queue = NULL; - if (NULL == data) { - ERROR("data is NULL"); - return; - } - - pthread_mutex_lock(&ipc_svc->request_data_queue_mutex); - ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd,(gpointer*)&data_queue); - if (ret == TRUE && data_queue) { - } - else { - data_queue = calloc(1, sizeof(pims_ipc_request_s)); - if (NULL == data_queue) { - ERROR("calloc() Fail"); - pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex); - return; - } - data_queue->request_count = 0; - pthread_mutex_init(&data_queue->raw_data_mutex, 0); - - g_hash_table_insert(ipc_svc->request_data_queue, g_strdup(client_id), data_queue); - } - ipc_svc->request_queue = g_list_append(ipc_svc->request_queue, g_strdup(client_id)); - pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex); - - pthread_mutex_lock(&data_queue->raw_data_mutex); - data_queue->raw_data = g_list_append(data_queue->raw_data, data); - data_queue->client_fd = client_fd; - data_queue->request_count++; - pthread_mutex_unlock(&data_queue->raw_data_mutex); -} - -static void __delete_request_queue(pims_ipc_svc_s *ipc_svc, char *client_id) -{ - pims_ipc_request_s *data_queue = NULL; - int ret; - int *org_fd; - GList *l; - GList *cursor; - - pthread_mutex_lock(&ipc_svc->request_data_queue_mutex); - ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd,(gpointer*)&data_queue); - if (ret == TRUE) - g_hash_table_remove(ipc_svc->request_data_queue, (void*)client_id); - - cursor = g_list_first(ipc_svc->request_queue); - while (cursor) { - l = cursor; - char *id = l->data; - cursor = g_list_next(cursor); - if (id && strcmp(id, client_id) == 0) { - free(id); - ipc_svc->request_queue = g_list_delete_link(ipc_svc->request_queue, l); - } - } - pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex); - - if (data_queue) { - pthread_mutex_lock(&data_queue->raw_data_mutex); - cursor = g_list_first(data_queue->raw_data); - pims_ipc_raw_data_s *data; - while(cursor) { - l = cursor; - data = (pims_ipc_raw_data_s *)cursor->data; - cursor = g_list_next(cursor); - data_queue->raw_data = g_list_delete_link(data_queue->raw_data, l); - __free_raw_data(data); - } - g_list_free(data_queue->raw_data); - pthread_mutex_unlock(&data_queue->raw_data_mutex); - pthread_mutex_destroy(&data_queue->raw_data_mutex); - free(data_queue); - } -} - -static int __send_identify(int fd, unsigned int seq_no, char *id, int id_len) -{ - int len = sizeof(unsigned int) // total size - + id_len + sizeof(unsigned int) // id - + sizeof(unsigned int); // seq_no - - char buf[len+1]; - - int length = 0; - memset(buf, 0x0, len+1); - - // total len - memcpy(buf, (void*)&len, sizeof(unsigned int)); - length += sizeof(unsigned int); - - // id - memcpy(buf+length, (void*)&(id_len), sizeof(unsigned int)); - length += sizeof(unsigned int); - memcpy(buf+length, (void*)(id), id_len); - length += id_len; - - // seq_no - memcpy(buf+length, (void*)&(seq_no), sizeof(unsigned int)); - length += sizeof(unsigned int); - - return socket_send(fd, buf, length); -} - -static int __recv_raw_data(int fd, pims_ipc_raw_data_s **data, bool *identity) -{ - int len = 0; - pims_ipc_raw_data_s *temp; - - /* read the size of message. note that ioctl is non-blocking */ - if (ioctl(fd, FIONREAD, &len)) { - ERROR("ioctl failed: %d", errno); - return -1; - } - - /* when server or client closed socket */ - if (len == 0) { - INFO("[IPC Socket] connection is closed"); - return 0; - } - - temp = (pims_ipc_raw_data_s*)calloc(1, sizeof(pims_ipc_raw_data_s)); - if (NULL == temp) { - ERROR("calloc() Fail"); - return -1; - } - temp->client_id = NULL; - temp->client_id_len = 0; - temp->call_id = NULL; - temp->call_id_len = 0; - temp->seq_no = 0; - temp->is_data = FALSE; - temp->data = NULL; - temp->data_len = 0; - - int ret = 0; - int read_len = 0; - unsigned int total_len = 0; - unsigned int is_data = FALSE; - - do { - // total length - ret = TEMP_FAILURE_RETRY(read(fd, (void *)&total_len, sizeof(unsigned int))); - if (ret < 0) { ERROR("read error"); break; } - read_len += ret; - - // client_id - ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(temp->client_id_len), sizeof(unsigned int))); - if (ret < 0) { ERROR("read error"); break; } - read_len += ret; - - temp->client_id = calloc(1, temp->client_id_len+1); - if (NULL == temp->client_id) { - ERROR("calloc() Fail"); - ret = -1; - break; - } - ret = socket_recv(fd, (void *)&(temp->client_id), temp->client_id_len); - if (ret < 0) { - ERROR("socket_recv error(%d)", ret); - break; - } - read_len += ret; - - // sequnce no - ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(temp->seq_no), sizeof(unsigned int))); - if (ret < 0) { ERROR("read error"); break; } - read_len += ret; - - if (total_len == read_len) { - *data = temp; - *identity = true; - return read_len; - } - - // call_id - ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(temp->call_id_len), sizeof(unsigned int))); - if (ret < 0) { ERROR("read error"); break; } - read_len += ret; - - temp->call_id = calloc(1, temp->call_id_len+1); - if (NULL == temp->call_id) { - ERROR("calloc() Fail"); - ret = -1; - break; - } - ret = socket_recv(fd, (void *)&(temp->call_id), temp->call_id_len); - if (ret < 0) { - ERROR("socket_recv error(%d)", ret); - break; - } - read_len += ret; - - // is_data - ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(is_data), sizeof(unsigned int))); - if (ret < 0) { ERROR("read error"); break; } - read_len += ret; - - // data - if (is_data) { - temp->is_data = TRUE; - ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(temp->data_len), sizeof(unsigned int))); - if (ret < 0) { ERROR("read error"); break; } - read_len += ret; - - temp->data = calloc(1, temp->data_len+1); - if (NULL == temp->data) { - ERROR("calloc() Fail"); - ret = -1; - break; - } - ret = socket_recv(fd, (void *)&(temp->data), temp->data_len); - if (ret < 0) { - ERROR("socket_recv error(%d)", ret); - break; - } - read_len += ret; - } - - INFO("client_id : %s, call_id : %s, seq_no : %d", temp->client_id, temp->call_id, temp->seq_no); - - *data = temp; - *identity = false; - } while(0); - - if (ret < 0) { - ERROR("total_len(%d) client_id_len(%d)", total_len, temp->client_id_len); - __free_raw_data(temp); - *data = NULL; - *identity = false; - return -1; - } - - return read_len; -} - -static gboolean __request_handler(GIOChannel *src, GIOCondition condition, gpointer data) -{ - int ret; - int event_fd = g_io_channel_unix_get_fd(src); - char *client_id = NULL; - pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data; - if (NULL == ipc_svc) { - ERROR("ipc_svc is NULL"); - return FALSE; - } - - if (G_IO_HUP & condition) { - INFO("client closed ------------------------client_fd : %d", event_fd); - - close(event_fd); - - // Find client_id - __find_client_id(ipc_svc, event_fd, true, &client_id); - - // Send client_id to manager to terminate worker thread - if (client_id) { - pthread_mutex_lock(&ipc_svc->client_info_mutex); - g_hash_table_remove(ipc_svc->client_info_map, client_id); - pthread_mutex_unlock(&ipc_svc->client_info_mutex); - - pthread_mutex_lock(&ipc_svc->manager_queue_from_epoll_mutex); - ipc_svc->manager_queue_from_epoll = g_list_append(ipc_svc->manager_queue_from_epoll, (void*)g_strdup(client_id)); - pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex); - write_command(ipc_svc->manager, 1); - __delete_request_queue(ipc_svc, client_id); - free(client_id); - } - - return FALSE; - } - - // receive data from client - int recv_len; - bool identity = false; - pims_ipc_raw_data_s *req = NULL; - - recv_len = __recv_raw_data(event_fd, &req, &identity); - if (recv_len > 0) { - // send command to router - if (identity) { - pims_ipc_client_map_s *client = (pims_ipc_client_map_s*)calloc(1, sizeof(pims_ipc_client_map_s)); - if (NULL == client) { - ERROR("calloc() Fail"); - close(event_fd); - return FALSE; - } - - client->fd = event_fd; - - char temp[100]; - snprintf(temp, sizeof(temp), "%d_%s", ipc_svc->unique_sequence_number++, req->client_id); - client->id = strdup(temp); - free(req->client_id); - req->client_id = NULL; - ipc_svc->client_id_fd_map = g_list_append(ipc_svc->client_id_fd_map, client); - - // send server pid to client - snprintf(temp, sizeof(temp), "%x", getpid()); - ret = __send_identify(event_fd, req->seq_no, temp, strlen(temp)); - - __free_raw_data(req); - if (ret < 0) { - ERROR("__send_identify() Fail(%d)", ret); - close(event_fd); - return FALSE; - } - - pims_ipc_client_info_s *client_info = NULL; - if (0 != _create_client_info(event_fd, &client_info)) - ERROR("_create_client_info() Fail"); - pthread_mutex_lock(&ipc_svc->client_info_mutex); - g_hash_table_insert(ipc_svc->client_info_map, g_strdup(client->id), client_info); - pthread_mutex_unlock(&ipc_svc->client_info_mutex); - - return TRUE; - } - - __find_client_id(ipc_svc, event_fd, false, &client_id); - - if (client_id) { - __request_push(ipc_svc, client_id, event_fd, req); - write_command(ipc_svc->router, 1); - } - else - ERROR("__find_client_id fail : event_fd (%d)", event_fd); - } - else { - ERROR("receive invalid : %d", event_fd); - close(event_fd); - return FALSE; - } - - return TRUE; -} - -static gboolean __socket_handler(GIOChannel *src, GIOCondition condition, gpointer data) -{ - GIOChannel *channel; - pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data; - int client_sockfd = -1; - int sockfd = ipc_svc->sockfd; - struct sockaddr_un clientaddr; - socklen_t client_len = sizeof(clientaddr); - - client_sockfd = accept(sockfd, (struct sockaddr *)&clientaddr, &client_len); - if (-1 == client_sockfd) { - char *errmsg = NULL; - char buf[1024] = {0}; - errmsg = strerror_r(errno, buf, sizeof(buf)); - if (errmsg) - ERROR("accept error : %s", errmsg); - return TRUE; - } - - channel = g_io_channel_unix_new(client_sockfd); - g_io_add_watch(channel, G_IO_IN|G_IO_HUP, __request_handler, data); - g_io_channel_unref(channel); - - return TRUE; -} - -static void* __main_loop(void *user_data) -{ - int ret; - struct sockaddr_un addr; - GIOChannel *gio = NULL; - pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)user_data; - - if (sd_listen_fds(1) == 1 && sd_is_socket_unix(SD_LISTEN_FDS_START, SOCK_STREAM, -1, ipc_svc->service, 0) > 0) { - ipc_svc->sockfd = SD_LISTEN_FDS_START; - } - else { - unlink(ipc_svc->service); - ipc_svc->sockfd = socket(PF_UNIX, SOCK_STREAM, 0); - - bzero(&addr, sizeof(addr)); - addr.sun_family = AF_UNIX; - snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service); - - ret = bind(ipc_svc->sockfd, (struct sockaddr *)&addr, sizeof(addr)); - if (ret != 0) - ERROR("bind error :%d", ret); - ret = listen(ipc_svc->sockfd, 30); - - ret = chown(ipc_svc->service, getuid(), ipc_svc->group); - ret = chmod(ipc_svc->service, ipc_svc->mode); - } - - gio = g_io_channel_unix_new(ipc_svc->sockfd); - - g_io_add_watch(gio, G_IO_IN, __socket_handler, (gpointer)ipc_svc); - - return NULL; -} - -static int __open_router_fd(pims_ipc_svc_s *ipc_svc) -{ - int ret = -1; - int flags; - int router; - int manager; - - // router inproc eventfd - router = eventfd(0,0); - if (-1 == router) { - ERROR("eventfd error : %d", errno); - return -1; - } - VERBOSE("router :%d\n", router); - - flags = fcntl(router, F_GETFL, 0); - if (flags == -1) - flags = 0; - ret = fcntl (router, F_SETFL, flags | O_NONBLOCK); - if (0 != ret) - VERBOSE("rounter fcntl : %d\n", ret); - - // manager inproc eventfd - manager = eventfd(0,0); - if (-1 == manager) { - ERROR("eventfd error : %d", errno); - close(router); - return -1; - } - VERBOSE("manager :%d\n", manager); - - flags = fcntl(manager, F_GETFL, 0); - if (flags == -1) - flags = 0; - ret = fcntl (manager, F_SETFL, flags | O_NONBLOCK); - if (0 != ret) - VERBOSE("manager fcntl : %d\n", ret); - - ipc_svc->router = router; - ipc_svc->manager = manager; - - return 0; -} - -static void __close_router_fd(pims_ipc_svc_s *ipc_svc) -{ - close(ipc_svc->router); - close(ipc_svc->manager); -} - -static void* __publish_loop(void *user_data) -{ - int ret; - int epfd; - - struct sockaddr_un addr; - struct epoll_event ev = {0}; - pims_ipc_svc_for_publish_s *ipc_svc = (pims_ipc_svc_for_publish_s*)user_data; - - unlink(ipc_svc->service); - ipc_svc->publish_sockfd = socket(PF_UNIX, SOCK_STREAM, 0); - - bzero(&addr, sizeof(struct sockaddr_un)); - addr.sun_family = AF_UNIX; - snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service); - - int flags = fcntl (ipc_svc->publish_sockfd, F_GETFL, 0); - if (flags == -1) - flags = 0; - ret = fcntl (ipc_svc->publish_sockfd, F_SETFL, flags | O_NONBLOCK); - VERBOSE("publish socketfd fcntl : %d\n", ret); - - ret = bind(ipc_svc->publish_sockfd, (struct sockaddr *)&(addr), sizeof(struct sockaddr_un)); - if (ret != 0) - ERROR("bind error :%d", ret); - ret = listen(ipc_svc->publish_sockfd, 30); - WARN_IF(ret != 0, "listen error :%d", ret); - - ret = chown(ipc_svc->service, getuid(), ipc_svc->group); - WARN_IF(ret != 0, "chown error :%d", ret); - ret = chmod(ipc_svc->service, ipc_svc->mode); - WARN_IF(ret != 0, "chmod error :%d", ret); - - epfd = epoll_create(MAX_EPOLL_EVENT); - - ev.events = EPOLLIN | EPOLLHUP; - ev.data.fd = ipc_svc->publish_sockfd; - - ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ipc_svc->publish_sockfd, &ev); - WARN_IF(ret != 0, "listen error :%d", ret); - - while (!ipc_svc->epoll_stop_thread) { - int i = 0; - struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, }; - int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, -1); - - if (ipc_svc->epoll_stop_thread) - break; - - if (event_num == -1) { - if (errno != EINTR) { - ERROR("errno:%d\n", errno); - break; - } - } - - for (i = 0; i < event_num; i++) { - int event_fd = events[i].data.fd; - - if (events[i].events & EPOLLHUP) { - VERBOSE("client closed -----------------------------------------:%d", event_fd); - if (epoll_ctl(epfd, EPOLL_CTL_DEL, event_fd, events) == -1) { - ERROR("epoll_ctl (EPOLL_CTL_DEL) fail : errno(%d)", errno); - } - close(event_fd); - - // Find client_id and delete - GList *cursor = NULL; - - pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex); - cursor = g_list_first(ipc_svc->subscribe_fds); - while (cursor) { - if (event_fd == (int)cursor->data) { - ipc_svc->subscribe_fds = g_list_delete_link(ipc_svc->subscribe_fds, cursor); - break; - } - cursor = g_list_next(cursor); - } - pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex); - continue; - } - else if (event_fd == ipc_svc->publish_sockfd) { // connect client - struct sockaddr_un remote; - int remote_len = sizeof(remote); - int client_fd = accept(ipc_svc->publish_sockfd, (struct sockaddr *)&remote, (socklen_t*) &remote_len); - if (client_fd == -1) { - ERROR("accept fail : errno : %d", errno); - continue; - } - VERBOSE("client subscriber connect: %d", client_fd); - - pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex); - ipc_svc->subscribe_fds = g_list_append(ipc_svc->subscribe_fds, (void*)client_fd); - pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex); - - ev.events = EPOLLIN; - ev.data.fd = client_fd; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev) == -1) { - ERROR("epoll_ctl (EPOLL_CTL_ADD) fail : error(%d)\n", errno); - continue; - } - } - } - } - - close(ipc_svc->publish_sockfd); - close(epfd); - - return NULL; -} - -static void __stop_for_publish(pims_ipc_svc_for_publish_s *ipc_svc) -{ - ipc_svc->epoll_stop_thread = true; -} - -static void* __router_loop(void *data) -{ - pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data; - int fd_count = 2; - struct pollfd *pollfds; - - pollfds = (struct pollfd*)calloc(fd_count, sizeof(struct pollfd)); - if (NULL == pollfds) { - ERROR("calloc() Fail"); - return NULL; - } - pollfds[0].fd = ipc_svc->router; - pollfds[0].events = POLLIN; - pollfds[1].fd = ipc_svc->manager; - pollfds[1].events = POLLIN; - - while (1) { - int ret = -1; - uint64_t dummy; - int check_router_queue = -1; - int check_manager_queue = -1; - - while (1) { - ret = poll(pollfds, fd_count, 1000); - if (ret == -1 && errno == EINTR) { - //free (pollfds); - continue; //return NULL; - } - break; - } - - if (ret > 0) { - if (pollfds[0].revents & POLLIN) { - // request router: send request to worker - if (sizeof (dummy) == read_command(pollfds[0].fd, &dummy)) { - check_router_queue = __process_router_event(ipc_svc, false); - } - } - - if (pollfds[1].revents & POLLIN) { - // worker manager - if (sizeof (dummy) == read_command(pollfds[1].fd, &dummy)) { - check_manager_queue = __process_manager_event(ipc_svc); - if (ipc_svc->delay_count > 0) - check_router_queue = __process_router_event(ipc_svc, true); - } - } - } - - // check queue - while(check_router_queue > 0 || check_manager_queue > 0) { - read_command(pollfds[0].fd, &dummy); - check_router_queue = __process_router_event(ipc_svc, false); - - read_command(pollfds[1].fd, &dummy); - check_manager_queue = __process_manager_event(ipc_svc); - if (ipc_svc->delay_count > 0) - check_router_queue = __process_router_event(ipc_svc, true); - } - } - - free(pollfds); - - return NULL; -} - -API void pims_ipc_svc_run_main_loop(GMainLoop* loop) -{ - int ret = -1; - GMainLoop* main_loop = loop; - - if (main_loop == NULL) { - main_loop = g_main_loop_new(NULL, FALSE); - } - - if (_g_singleton_for_publish) - __launch_thread(__publish_loop, _g_singleton_for_publish); - - if (_g_singleton) { - ret = __open_router_fd(_g_singleton); - ASSERT(ret == 0); - - int i; - // launch worker threads in advance - for (i = 0; i < _g_singleton->workers_max_count; i++) - __launch_thread(__worker_loop, _g_singleton); - - __launch_thread(__router_loop, _g_singleton); - __main_loop(_g_singleton); - } - - g_main_loop_run(main_loop); - - if (_g_singleton) - __close_router_fd(_g_singleton); - - if (_g_singleton_for_publish) - __stop_for_publish(_g_singleton_for_publish); - -} - -API void pims_ipc_svc_set_client_disconnected_cb(pims_ipc_svc_client_disconnected_cb callback, void *user_data) -{ - if (_client_disconnected_cb.callback) { - ERROR("already registered"); - return; - } - _client_disconnected_cb.callback = callback; - _client_disconnected_cb.user_data = user_data; -} - -API bool pims_ipc_svc_check_privilege(pims_ipc_h ipc, char *privilege) -{ - int ret; - int worker_id = (int)ipc; - pims_ipc_client_info_s *client_info = NULL; - pims_ipc_client_info_s *client_info_clone = NULL; - - if (NULL == privilege) { - ERROR("privilege is NULL"); - return false; - } - - pthread_mutex_lock(&_g_singleton->client_info_mutex); - client_info = g_hash_table_lookup(_g_singleton->worker_client_info_map, GINT_TO_POINTER(worker_id)); - if (NULL == client_info) { - ERROR("client_info is NULL"); - pthread_mutex_unlock(&_g_singleton->client_info_mutex); - return false; - } - client_info_clone = _clone_client_info(client_info); - pthread_mutex_unlock(&_g_singleton->client_info_mutex); - - if (NULL == client_info_clone) { - ERROR("client_info_clone is NULL"); - return false; - } - - pthread_mutex_lock(&_g_singleton->cynara_mutex); - ret = cynara_check(_g_singleton->cynara, client_info_clone->smack, client_info_clone->client_session, client_info_clone->uid, privilege); - pthread_mutex_unlock(&_g_singleton->cynara_mutex); - - _destroy_client_info(client_info_clone); - - if (CYNARA_API_ACCESS_ALLOWED == ret) - return true; - - return false; -} - -API int pims_ipc_svc_get_smack_label(pims_ipc_h ipc, char **p_smack) -{ - pims_ipc_client_info_s *client_info = NULL; - int worker_id = (int)ipc; - - pthread_mutex_lock(&_g_singleton->client_info_mutex); - client_info = g_hash_table_lookup(_g_singleton->worker_client_info_map, GINT_TO_POINTER(worker_id)); - if (NULL == client_info) { - ERROR("g_hash_table_lookup(%d) return NULL", worker_id); - pthread_mutex_unlock(&_g_singleton->client_info_mutex); - return -1; - } - - if (client_info->smack) { - *p_smack = strdup(client_info->smack); - if (NULL == *p_smack) { - ERROR("strdup() return NULL"); - pthread_mutex_unlock(&_g_singleton->client_info_mutex); - return -1; - } - } - pthread_mutex_unlock(&_g_singleton->client_info_mutex); - return 0; } diff --git a/src/pims-ipc-utils.c b/src/pims-ipc-utils.c new file mode 100644 index 0000000..230b9c2 --- /dev/null +++ b/src/pims-ipc-utils.c @@ -0,0 +1,48 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include "pims-internal.h" + +void utils_launch_thread(void *(*start_routine)(void *), void *data) +{ + int ret = 0; + + pthread_t worker; + pthread_attr_t attr; + + /* set kernel thread */ + ret = pthread_attr_init(&attr); + if (0 != ret) { + ERR("pthread_attr_init() Fail(%d)", ret); + return; + } + ret = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); + if (0 != ret) { + ERR("pthread_attr_setscope() Fail(%d)", ret); + return; + } + ret = pthread_create(&worker, &attr, start_routine, data); + if (0 != ret) { + ERR("pthread_create() Fail(%d)", ret); + return; + } + + pthread_detach(worker); +} + diff --git a/src/pims-ipc-utils.h b/src/pims-ipc-utils.h new file mode 100644 index 0000000..f5cb79d --- /dev/null +++ b/src/pims-ipc-utils.h @@ -0,0 +1,26 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PIMS_UTILS_H__ +#define __PIMS_UTILS_H__ + +#define UTILS_STR_EQUAL 0 + +void utils_launch_thread(void *(*start_routine)(void *), void *data); + +#endif /*__PIMS_UTILS_H__*/ + diff --git a/src/pims-ipc-worker.c b/src/pims-ipc-worker.c new file mode 100644 index 0000000..7d46f10 --- /dev/null +++ b/src/pims-ipc-worker.c @@ -0,0 +1,638 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "pims-internal.h" +#include "pims-ipc-data-internal.h" +#include "pims-ipc-data.h" +#include "pims-socket.h" +#include "pims-ipc-utils.h" +#include "pims-ipc-worker.h" + +#define PIMS_IPC_WORKER_THREAD_WAIT_TIME 100 /* milliseconds */ + +typedef struct { + pims_ipc_svc_client_disconnected_cb callback; + void * user_data; +} pims_ipc_svc_client_disconnected_cb_t; + +/* idle_worker_pool SHOULD handle on main thread */ +static GList *idle_worker_pool; +static GHashTable *worker_cb_table; /* call_id, cb_data */ +static __thread pims_ipc_svc_client_disconnected_cb_t _client_disconnected_cb = {NULL, NULL}; + +static int unique_sequence_number; +static GHashTable *worker_client_info_map; /* key : worker_id, data : pims_ipc_client_info_s* */ + +int worker_wait_idle_worker_ready(pims_ipc_worker_data_s *worker_data) +{ + struct timespec timeout = {0}; + + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_nsec += PIMS_IPC_WORKER_THREAD_WAIT_TIME * 1000000; + timeout.tv_sec += timeout.tv_nsec / 1000000000L; + timeout.tv_nsec = timeout.tv_nsec % 1000000000L; + + pthread_mutex_lock(&worker_data->ready_mutex); + + if (!worker_data->fd) { + WARN("worker fd is null, wait until worker thread create done."); + if (pthread_cond_timedwait(&worker_data->ready, &worker_data->ready_mutex, &timeout)) { + ERR("Get idle worker timeout Fail!"); + pthread_mutex_unlock(&worker_data->ready_mutex); + return -1; + } + } + + pthread_mutex_unlock(&worker_data->ready_mutex); + return 0; +} + +pims_ipc_worker_data_s* worker_get_idle_worker(pims_ipc_svc_s *ipc_svc, + const char *client_id) +{ + pims_ipc_worker_data_s *worker_data; + + RETV_IF(NULL == client_id, NULL); + RETVM_IF(NULL == idle_worker_pool, NULL, "There is no idle worker"); + + worker_data = g_hash_table_lookup(ipc_svc->client_worker_map, client_id); + if (worker_data) + return worker_data; + + worker_data = idle_worker_pool->data; + + idle_worker_pool = g_list_delete_link(idle_worker_pool, idle_worker_pool); + + if (worker_data) + g_hash_table_insert(ipc_svc->client_worker_map, g_strdup(client_id), worker_data); + + return worker_data; +} + +pims_ipc_worker_data_s* worker_find(pims_ipc_svc_s *ipc_svc, const char *client_id) +{ + pims_ipc_worker_data_s *worker_data; + + RETV_IF(NULL == client_id, NULL); + + if (FALSE == g_hash_table_lookup_extended(ipc_svc->client_worker_map, client_id, + NULL, (gpointer*)&worker_data)) { + ERR("g_hash_table_lookup_extended(%s) Fail", client_id); + return NULL; + } + + return worker_data; +} + +void worker_stop_client_worker(pims_ipc_svc_s *ipc_svc, const char *client_id) +{ + pims_ipc_worker_data_s *worker_data; + + worker_data = worker_find(ipc_svc, client_id); + + /* remove client_fd */ + g_hash_table_remove(ipc_svc->client_worker_map, client_id); + + /* stop worker thread */ + if (worker_data) { + worker_data->stop_thread = TRUE; + worker_data->client_fd = -1; + write_command(worker_data->fd, 1); + DBG("write command to worker terminate(worker_fd:%d)", worker_data->fd); + } +} + + +void worker_free_raw_data(void *data) +{ + pims_ipc_raw_data_s *raw_data = data; + + if (NULL == raw_data) + return; + + free(raw_data->client_id); + free(raw_data->call_id); + free(raw_data->data); + free(raw_data); +} + +void worker_free_data(gpointer data) +{ + pims_ipc_worker_data_s *worker_data = data; + + pthread_mutex_lock(&worker_data->queue_mutex); + if (worker_data->list) + g_list_free_full(worker_data->list, worker_free_raw_data); + pthread_mutex_unlock(&worker_data->queue_mutex); + + pthread_cond_destroy(&worker_data->ready); + pthread_mutex_destroy(&worker_data->ready_mutex); + + free(worker_data); +} + + +int worker_push_raw_data(pims_ipc_worker_data_s *worker_data, int client_fd, + pims_ipc_raw_data_s *data) +{ + pthread_mutex_lock(&worker_data->queue_mutex); + worker_data->list = g_list_append(worker_data->list, data); + worker_data->client_fd = client_fd; + pthread_mutex_unlock(&worker_data->queue_mutex); + + return TRUE; +} + +static gboolean worker_pop_raw_data(pims_ipc_worker_data_s *worker, + pims_ipc_raw_data_s **data) +{ + if (!worker) + return FALSE; + + pthread_mutex_lock(&worker->queue_mutex); + if (!worker->list) { + pthread_mutex_unlock(&worker->queue_mutex); + *data = NULL; + return FALSE; + } + + *data = g_list_first(worker->list)->data; + worker->list = g_list_delete_link(worker->list, g_list_first(worker->list)); + pthread_mutex_unlock(&worker->queue_mutex); + + return TRUE; +} + +int worker_set_callback(char *call_id, pims_ipc_svc_cb_s *cb_data) +{ + return g_hash_table_insert(worker_cb_table, call_id, cb_data); +} + + +static void __run_callback(int client_pid, char *call_id, pims_ipc_data_h dhandle_in, + pims_ipc_data_h *dhandle_out) +{ + pims_ipc_svc_cb_s *cb_data = NULL; + + VERBOSE("Call id [%s]", call_id); + + cb_data = g_hash_table_lookup(worker_cb_table, call_id); + if (cb_data == NULL) { + VERBOSE("No Data for %s", call_id); + return; + } + + /* TODO: client_pid is not valide pims_ipc_h */ + cb_data->callback((pims_ipc_h)client_pid, dhandle_in, dhandle_out, cb_data->user_data); +} + +static void __make_raw_data(const char *call_id, int seq_no, pims_ipc_data_h data, + pims_ipc_raw_data_s **out) +{ + pims_ipc_data_s *data_in = data; + pims_ipc_raw_data_s *raw_data = NULL; + + RET_IF(NULL == out); + RET_IF(NULL == call_id); + + raw_data = calloc(1, sizeof(pims_ipc_raw_data_s)); + if (NULL == raw_data) { + ERR("calloc() Fail(%d)", errno); + return; + } + + raw_data->call_id = g_strdup(call_id); + raw_data->call_id_len = strlen(raw_data->call_id); + raw_data->seq_no = seq_no; + + if (data_in && 0 < data_in->buf_size) { + raw_data->has_data = TRUE; + raw_data->data = calloc(1, data_in->buf_size+1); + if (NULL == raw_data->data) { + ERR("calloc() Fail"); + free(raw_data->call_id); + free(raw_data); + return; + } + memcpy(raw_data->data, data_in->buf, data_in->buf_size); + raw_data->data_len = data_in->buf_size; + } else { + raw_data->has_data = FALSE; + raw_data->data_len = 0; + raw_data->data = NULL; + } + *out = raw_data; + return; +} + +static int __send_raw_data(int fd, const char *client_id, pims_ipc_raw_data_s *data) +{ + int ret = 0; + unsigned int len, total_len, client_id_len; + + RETV_IF(NULL == data, -1); + RETV_IF(NULL == client_id, -1); + + client_id_len = strlen(client_id); + + len = sizeof(total_len) + sizeof(client_id_len) + client_id_len + sizeof(data->seq_no) + + data->call_id_len + sizeof(data->call_id) + sizeof(data->has_data); + total_len = len; + + if (data->has_data) { + len += sizeof(data->data_len); + total_len = len + data->data_len; + } + + INFO("client_id: %s, call_id : %s, seq no :%d, len:%d, total len :%d", client_id, + data->call_id, data->seq_no, len, total_len); + + int length = 0; + char buf[len+1]; + memset(buf, 0x0, len+1); + + memcpy(buf, &total_len, sizeof(total_len)); + length += sizeof(total_len); + + memcpy(buf+length, &client_id_len, sizeof(client_id_len)); + length += sizeof(client_id_len); + memcpy(buf+length, client_id, client_id_len); + length += client_id_len; + + memcpy(buf+length, &(data->seq_no), sizeof(data->seq_no)); + length += sizeof(data->seq_no); + + memcpy(buf+length, &(data->call_id_len), sizeof(data->call_id_len)); + length += sizeof(data->call_id_len); + memcpy(buf+length, data->call_id, data->call_id_len); + length += data->call_id_len; + + memcpy(buf+length, &(data->has_data), sizeof(data->has_data)); + length += sizeof(data->has_data); + + if (data->has_data) { + memcpy(buf+length, &(data->data_len), sizeof(data->data_len)); + length += sizeof(data->data_len); + ret = socket_send(fd, buf, length); + + /* send data */ + if (ret > 0) + ret += socket_send_data(fd, data->data, data->data_len); + } else { + ret = socket_send(fd, buf, length); + } + + return ret; +} + +static int _get_pid_from_fd(int fd) +{ + struct ucred uc; + socklen_t uc_len = sizeof(uc); + + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &uc_len) < 0) + ERR("getsockopt() Failed(%d)", errno); + + DBG("Client PID(%d)", uc.pid); + return uc.pid; +} + +static int __worker_loop_handle_raw_data(pims_ipc_worker_data_s *worker_data) +{ + int disconnected = FALSE; + pims_ipc_data_h data_in = NULL; + pims_ipc_data_h data_out = NULL; + pims_ipc_raw_data_s *result = NULL; + pims_ipc_raw_data_s *raw_data = NULL; + + if (FALSE == worker_pop_raw_data(worker_data, &raw_data)) + return disconnected; + + int client_pid = _get_pid_from_fd(worker_data->client_fd); + + if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_CREATE, raw_data->call_id)) { + client_register_info(worker_data->client_fd, client_pid); + + } else if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_DESTROY, raw_data->call_id)) { + disconnected = TRUE; + } else { + data_in = pims_ipc_data_steal_unmarshal(raw_data->data, raw_data->data_len); + + __run_callback(client_pid, raw_data->call_id, data_in, &data_out); + pims_ipc_data_destroy(data_in); + } + + if (data_out) { + __make_raw_data(raw_data->call_id, raw_data->seq_no, data_out, &result); + pims_ipc_data_destroy(data_out); + } else + __make_raw_data(raw_data->call_id, raw_data->seq_no, NULL, &result); + + if (worker_data->client_fd != -1) + __send_raw_data(worker_data->client_fd, raw_data->client_id, result); + worker_free_raw_data(raw_data); + worker_free_raw_data(result); + + return disconnected; +} + +static void* __worker_loop(void *data) +{ + int ret; + pthread_t pid; + int worker_fd; + int disconnected = FALSE; + pims_ipc_worker_data_s *worker_data = data; + + RETV_IF(NULL == data, NULL); + + worker_fd = eventfd(0, 0); + if (worker_fd == -1) + return NULL; + + INFO("worker Created ********** worker_fd = %d ***********", worker_fd); + + pid = pthread_self(); + worker_data->client_fd = -1; + worker_data->stop_thread = FALSE; + pthread_mutex_lock(&worker_data->ready_mutex); + worker_data->fd = worker_fd; + pthread_cond_signal(&worker_data->ready); + pthread_mutex_unlock(&worker_data->ready_mutex); + + struct pollfd pollfds[1]; + pollfds[0].fd = worker_fd; + pollfds[0].events = POLLIN; + + while (!worker_data->stop_thread) { + ret = poll(pollfds, 1, 3000); /* waiting command from router */ + if (-1 == ret) { + if (errno != EINTR) + ERR("poll() Fail(%d)", errno); + continue; + } + if (worker_data->stop_thread) + break; + + if (0 == ret) + continue; + + if (pollfds[0].revents & POLLIN) { + uint64_t dummy; + read_command(pollfds[0].fd, &dummy); + + disconnected = __worker_loop_handle_raw_data(worker_data); + } + } + + if (!disconnected) + ERR("client fd closed, worker_fd : %d", worker_fd); + INFO("task thread terminated --------------------------- (worker_fd : %d)", worker_fd); + + int client_pid = _get_pid_from_fd(worker_data->client_fd); + + worker_free_data(worker_data); + close(worker_fd); + + /* pthread_mutex_lock(&worker_data->client_mutex); */ + g_hash_table_remove(worker_client_info_map, GINT_TO_POINTER(client_pid)); + DBG("----------------removed(%d)", client_pid); + /* pthread_mutex_unlock(&worker_data->client_mutex); */ + + if (_client_disconnected_cb.callback) + _client_disconnected_cb.callback((pims_ipc_h)pid, _client_disconnected_cb.user_data); + + return NULL; +} + +void worker_start_idle_worker(pims_ipc_svc_s *ipc_data) +{ + int i; + pims_ipc_worker_data_s *worker_data; + + for (i = g_list_length(idle_worker_pool); i < ipc_data->workers_max_count; i++) { + worker_data = calloc(1, sizeof(pims_ipc_worker_data_s)); + if (NULL == worker_data) { + ERR("calloc() Fail(%d)", errno); + continue; + } + pthread_mutex_init(&worker_data->queue_mutex, 0); + pthread_mutex_init(&worker_data->ready_mutex, NULL); + pthread_cond_init(&worker_data->ready, NULL); + /* pthread_mutex_init(&worker_data->client_mutex, 0); */ + + utils_launch_thread(__worker_loop, worker_data); + idle_worker_pool = g_list_append(idle_worker_pool, worker_data); + } +} + +void worker_stop_idle_worker() +{ + GList *cursor; + pims_ipc_worker_data_s *worker_data; + + cursor = idle_worker_pool; + while (cursor) { + worker_data = cursor->data; + worker_data->stop_thread = TRUE; + write_command(worker_data->fd, 1); + cursor = cursor->next; + } +} + +void worker_init() +{ + worker_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free); + WARN_IF(NULL == worker_cb_table, "worker cb table is NULL"); +} + +void worker_deinit() +{ + g_list_free(idle_worker_pool); + idle_worker_pool = NULL; + + g_hash_table_destroy(worker_cb_table); + worker_cb_table = NULL; +} + +API void pims_ipc_svc_set_client_disconnected_cb( + pims_ipc_svc_client_disconnected_cb callback, void *user_data) +{ + if (_client_disconnected_cb.callback) { + ERR("already registered"); + return; + } + _client_disconnected_cb.callback = callback; + _client_disconnected_cb.user_data = user_data; +} + +void client_destroy_info(gpointer p) +{ + pims_ipc_client_info_s *client_info = p; + + if (NULL == client_info) + return; + free(client_info->smack); + free(client_info->uid); + free(client_info->client_session); + free(client_info); +} + +void client_init(void) +{ + unique_sequence_number = 0; + worker_client_info_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, + client_destroy_info); +} + +void client_deinit(void) +{ + DBG("----------destroied"); + g_hash_table_destroy(worker_client_info_map); +} + +static int _create_client_info(int fd, pims_ipc_client_info_s **p_client_info) +{ + int ret; + pid_t pid; + char errmsg[1024] = {0}; + + pims_ipc_client_info_s *client_info = calloc(1, sizeof(pims_ipc_client_info_s)); + if (NULL == client_info) { + ERR("calloc() return NULL"); + return -1; + } + + ret = cynara_creds_socket_get_client(fd, CLIENT_METHOD_SMACK, &(client_info->smack)); + if (CYNARA_API_SUCCESS != ret) { + cynara_strerror(ret, errmsg, sizeof(errmsg)); + ERR("cynara_creds_socket_get_client() Fail(%d,%s)", ret, errmsg); + client_destroy_info(client_info); + return -1; + } + + ret = cynara_creds_socket_get_user(fd, USER_METHOD_UID, &(client_info->uid)); + if (CYNARA_API_SUCCESS != ret) { + cynara_strerror(ret, errmsg, sizeof(errmsg)); + ERR("cynara_creds_socket_get_user() Fail(%d,%s)", ret, errmsg); + client_destroy_info(client_info); + return -1; + } + + ret = cynara_creds_socket_get_pid(fd, &pid); + if (CYNARA_API_SUCCESS != ret) { + cynara_strerror(ret, errmsg, sizeof(errmsg)); + ERR("cynara_creds_socket_get_pid() Fail(%d,%s)", ret, errmsg); + client_destroy_info(client_info); + return -1; + } + + client_info->client_session = cynara_session_from_pid(pid); + if (NULL == client_info->client_session) { + ERR("cynara_session_from_pid() return NULL"); + client_destroy_info(client_info); + return -1; + } + *p_client_info = client_info; + + return 0; +} + +int client_register_info(int client_fd, int client_pid) +{ + pims_ipc_client_info_s *client_info = NULL; + int ret = 0; + + ret = _create_client_info(client_fd, &client_info); + if (ret < 0) { + ERR("_create_client_info() Fail(%d)", ret); + return -1; + } + g_hash_table_insert(worker_client_info_map, GINT_TO_POINTER(client_pid), client_info); + DBG("-------inserted:pid(%d), info(%p)", client_pid, client_info); + + return 0; +} + +int client_get_unique_sequence_number(void) +{ + return unique_sequence_number++; +} + +pims_ipc_client_info_s* client_clone_info(pims_ipc_client_info_s *client_info) +{ + if (NULL == client_info) { + ERR("client_info is NULL"); + return NULL; + } + + pims_ipc_client_info_s *clone = calloc(1, sizeof(pims_ipc_client_info_s)); + if (NULL == clone) { + ERR("calloc() Fail"); + return NULL; + } + + if (client_info->smack) { + clone->smack = strdup(client_info->smack); + if (NULL == clone->smack) { + ERR("strdup() Fail"); + client_destroy_info(clone); + return NULL; + } + } + + if (client_info->uid) { + clone->uid = strdup(client_info->uid); + if (NULL == clone->uid) { + ERR("strdup() Fail"); + client_destroy_info(clone); + return NULL; + } + } + + if (client_info->client_session) { + clone->client_session = strdup(client_info->client_session); + if (NULL == clone->client_session) { + ERR("strdup() Fail"); + client_destroy_info(clone); + return NULL; + } + } + + return clone; +} + +pims_ipc_client_info_s* client_get_info(int client_pid) +{ + pims_ipc_client_info_s *client_info = NULL; + + client_info = g_hash_table_lookup(worker_client_info_map, GINT_TO_POINTER(client_pid)); + DBG("---------------get client_pid(%d)", client_pid); + return client_info; +} + diff --git a/src/pims-ipc-worker.h b/src/pims-ipc-worker.h new file mode 100644 index 0000000..60a0ea8 --- /dev/null +++ b/src/pims-ipc-worker.h @@ -0,0 +1,62 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PIMS_WORKER_H__ +#define __PIMS_WORKER_H__ + +#include "pims-ipc-data-internal.h" + +typedef struct { + pims_ipc_svc_call_cb callback; + void *user_data; +} pims_ipc_svc_cb_s; + +typedef struct { + char *smack; + char *uid; + char *client_session; +} pims_ipc_client_info_s; + +void worker_init(); +void worker_deinit(); +void worker_start_idle_worker(pims_ipc_svc_s *ipc_data); +void worker_stop_idle_worker(); +void worker_stop_client_worker(pims_ipc_svc_s *ipc_svc, const char *client_id); + + +int worker_push_raw_data(pims_ipc_worker_data_s *worker_data, int client_fd, + pims_ipc_raw_data_s *data); + +int worker_wait_idle_worker_ready(pims_ipc_worker_data_s *worker_data); +pims_ipc_worker_data_s* worker_get_idle_worker(pims_ipc_svc_s *ipc_svc, const char *client_id); +int worker_set_callback(char *call_id, pims_ipc_svc_cb_s *cb_data); +pims_ipc_worker_data_s* worker_find(pims_ipc_svc_s *ipc_svc, const char *client_id); + +void worker_free_data(gpointer data); +void worker_free_raw_data(void *data); + +void client_init(void); +void client_deinit(void); +int client_register_info(int client_fd, int client_pid); +void client_destroy_info(gpointer p); +int client_get_unique_sequence_number(void); +pims_ipc_client_info_s* client_get_info(int worker_id); +pims_ipc_client_info_s* client_clone_info(pims_ipc_client_info_s *client_info); + +#endif /*__PIMS_WORKER_H__*/ + + diff --git a/src/pims-ipc.c b/src/pims-ipc.c index 5e6d31f..0402dea 100644 --- a/src/pims-ipc.c +++ b/src/pims-ipc.c @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -24,19 +24,18 @@ #include #include #include -#include // pollfds -#include // sockaddr_un -#include // ioctl -#include //socket +#include /* pollfds */ +#include /* sockaddr_un */ +#include /* ioctl */ +#include /* socket */ #include -#include // epoll -#include // eventfd +#include /* epoll */ +#include /* eventfd */ #include #include #include "pims-internal.h" #include "pims-socket.h" -#include "pims-debug.h" #include "pims-ipc-data.h" #include "pims-ipc-data-internal.h" #include "pims-ipc.h" @@ -47,32 +46,27 @@ static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER; -typedef enum -{ +typedef enum { PIMS_IPC_CALL_STATUS_READY = 0, PIMS_IPC_CALL_STATUS_IN_PROGRESS } pims_ipc_call_status_e; -typedef enum -{ +typedef enum { PIMS_IPC_MODE_REQ = 0, PIMS_IPC_MODE_SUB } pims_ipc_mode_e; -typedef struct -{ +typedef struct { pims_ipc_subscribe_cb callback; void * user_data; } pims_ipc_cb_s; -typedef struct -{ +typedef struct { char *call_id; pims_ipc_data_h *handle; -}pims_ipc_subscribe_data_s; +} pims_ipc_subscribe_data_s; -typedef struct -{ +typedef struct { int fd; char *service; char *id; @@ -122,7 +116,7 @@ static void __pims_ipc_free_handle(pims_ipc_s *handle) { pthread_mutex_lock(&__gmutex); - handle->epoll_stop_thread = true; + handle->epoll_stop_thread = TRUE; if (handle->fd != -1) close(handle->fd); @@ -136,7 +130,7 @@ static void __pims_ipc_free_handle(pims_ipc_s *handle) g_free(handle->service); if (handle->async_channel) { - // remove a subscriber handle from the golbal list + /* remove a subscriber handle from the golbal list */ subscribe_handles = g_list_remove(subscribe_handles, handle); VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles)); @@ -148,16 +142,18 @@ static void __pims_ipc_free_handle(pims_ipc_s *handle) g_hash_table_destroy(handle->subscribe_cb_table); pthread_mutex_lock(&handle->data_queue_mutex); - if (handle->data_queue) { + if (handle->data_queue) g_list_free_full(handle->data_queue, __sub_data_free); - } + pthread_mutex_unlock(&handle->data_queue_mutex); pthread_mutex_destroy(&handle->data_queue_mutex); if (handle->subscribe_fd != -1) close(handle->subscribe_fd); - g_source_remove(handle->disconnected_source); + if (0 < handle->disconnected_source) + g_source_remove(handle->disconnected_source); + pthread_mutex_destroy(&handle->call_status_mutex); g_free(handle); @@ -193,16 +189,15 @@ static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle) } cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id); - if (cb_data == NULL) { + if (cb_data == NULL) VERBOSE("unable to find %s", call_id); - } else cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data); handle->data_queue = g_list_delete_link(handle->data_queue, cursor); __sub_data_free(data); pthread_mutex_unlock(&handle->data_queue_mutex); - } while(1); + } while (1); return 0; } @@ -211,16 +206,14 @@ static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condi { pims_ipc_s *handle = (pims_ipc_s *)data; - VERBOSE(""); - if (condition & G_IO_HUP) return FALSE; pthread_mutex_lock(&__gmutex); - // check if a subscriber handle is exists + /* check if a subscriber handle is exists */ if (g_list_find(subscribe_handles, handle) == NULL) { - ERROR("No such handle that ID is %p", handle); + ERR("No such handle that ID is %p", handle); pthread_mutex_unlock(&__gmutex); return FALSE; } @@ -245,30 +238,26 @@ static unsigned int __get_global_sequence_no() static int __pims_ipc_send_identify(pims_ipc_s *handle) { - unsigned int sequence_no; + unsigned int total_len, seq_no; unsigned int client_id_len = strlen(handle->id); - unsigned int len = sizeof(unsigned int) // total size - + client_id_len + sizeof(unsigned int) // client_id - + sizeof(unsigned int) ; // seq_no - char buf[len+1]; + total_len = sizeof(total_len) + sizeof(client_id_len)+client_id_len + sizeof(seq_no); + int length = 0; - memset(buf, 0x0, len+1); + char buf[total_len+1]; + memset(buf, 0x0, total_len+1); - // total len - memcpy(buf, (void*)&len, sizeof(unsigned int)); - length += sizeof(unsigned int); + memcpy(buf, &total_len, sizeof(total_len)); + length += sizeof(total_len); - // client_id - memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int)); - length += sizeof(unsigned int); - memcpy(buf+length, (void*)(handle->id), client_id_len); + memcpy(buf+length, &(client_id_len), sizeof(client_id_len)); + length += sizeof(client_id_len); + memcpy(buf+length, handle->id, client_id_len); length += client_id_len; - // seq_no - GET_CALL_SEQUNECE_NO(handle, sequence_no); - memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int)); - length += sizeof(unsigned int); + GET_CALL_SEQUNECE_NO(handle, seq_no); + memcpy(buf+length, &(seq_no), sizeof(seq_no)); + length += sizeof(seq_no); return socket_send(handle->fd, buf, length); } @@ -279,20 +268,20 @@ static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out) gboolean is_ok = FALSE; int len = 0; pims_ipc_data_h data = NULL; - unsigned int sequence_no = 0; + unsigned int seq_no = 0; char *client_id = NULL; char *call_id = NULL; char *buf = NULL; /* read the size of message. note that ioctl is non-blocking */ if (ioctl(handle->fd, FIONREAD, &len)) { - ERROR("ioctl failed: %d", errno); + ERR("ioctl failed: %d", errno); return -1; } /* when server or client closed socket */ if (len == 0) { - ERROR("[IPC Socket] connection is closed"); + ERR("[IPC Socket] connection is closed"); return -1; } @@ -301,99 +290,90 @@ static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out) unsigned int total_len = 0; unsigned int client_id_len = 0; unsigned int call_id_len = 0; - unsigned int is_data = FALSE; + unsigned int has_data = FALSE; - // get total_len - read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int))); + read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len))); - // client_id - read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(client_id_len), sizeof(unsigned int))); + read_len += TEMP_FAILURE_RETRY(read(handle->fd, &(client_id_len), sizeof(client_id_len))); if (client_id_len > 0 && client_id_len < UINT_MAX-1) { client_id = calloc(1, client_id_len+1); if (client_id == NULL) { - ERROR("calloc fail"); + ERR("calloc fail"); break; } - } - else + } else break; - ret = socket_recv(handle->fd, (void *)&(client_id), client_id_len); - if (ret < 0) { ERROR("socket_recv error"); break; } + ret = socket_recv(handle->fd, (void *)&client_id, client_id_len); + if (ret < 0) { ERR("socket_recv error"); break; } read_len += ret; - // sequence no - read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(sequence_no), sizeof(unsigned int))); + read_len += TEMP_FAILURE_RETRY(read(handle->fd, &seq_no, sizeof(seq_no))); if (total_len == read_len) { - // send identity + /* send identity */ data = pims_ipc_data_create(0); if (NULL == data) { - ERROR("pims_ipc_data_create() Fail"); + ERR("pims_ipc_data_create() Fail"); break; } ret = pims_ipc_data_put(data, client_id, client_id_len); if (ret != 0) - WARNING("pims_ipc_data_put fail(%d)", ret); + WARN("pims_ipc_data_put fail(%d)", ret); break; } - read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int))); + read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len))); if (call_id_len > 0 && call_id_len < UINT_MAX-1) { call_id = calloc(1, call_id_len+1); if (call_id == NULL) { - ERROR("calloc fail"); + ERR("calloc fail"); break; } - } - else + } else break; - ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len); - if (ret < 0) { ERROR("socket_recv error"); break; } + ret = socket_recv(handle->fd, (void *)&call_id, call_id_len); + if (ret < 0) { ERR("socket_recv error"); break; } read_len += ret; - read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int))); - if (is_data) { + read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data))); + if (has_data) { unsigned int data_len; - read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int))); + read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len))); if (data_len > 0 && data_len < UINT_MAX-1) { buf = calloc(1, data_len+1); if (buf == NULL) { - ERROR("calloc fail"); + ERR("calloc fail"); break; } - } - else + } else break; - ret = socket_recv(handle->fd, (void *)&(buf), data_len); - if (ret < 0) { ERROR("socket_recv error"); break; } + ret = socket_recv(handle->fd, (void *)&buf, data_len); + if (ret < 0) { ERR("socket_recv error"); break; } read_len += ret; data = pims_ipc_data_steal_unmarshal(buf, data_len); if (NULL == data) { - ERROR("pims_ipc_data_steal_unmarshal() Fail"); + ERR("pims_ipc_data_steal_unmarshal() Fail"); break; } - buf = NULL; } - INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, sequence_no); - } while(0); + INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, seq_no); + } while (0); free(client_id); free(call_id); free(buf); - if (sequence_no == handle->call_sequence_no) { - if (data_out != NULL) { + if (seq_no == handle->call_sequence_no) { + if (data_out != NULL) *data_out = data; - } else if (data) pims_ipc_data_destroy(data); is_ok = TRUE; - } - else { + } else { if (data) pims_ipc_data_destroy(data); - VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no); + VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, seq_no); } if (is_ok) @@ -405,27 +385,23 @@ static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out) static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out) { int ret = -1; - struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof (struct pollfd)); - if (NULL == pollfds) { - ERROR("calloc() Fail"); - return -1; - } + struct pollfd pollfds[1]; pollfds[0].fd = handle->fd; pollfds[0].events = POLLIN | POLLERR | POLLHUP; - while(1) { - while(1) { + while (1) { + while (1) { ret = poll(pollfds, 1, 1000); - if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) { + if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) continue; - } + break; } if (ret > 0) { if (pollfds[0].revents & (POLLERR|POLLHUP)) { - ERROR("Server disconnected"); + ERR("Server disconnected"); ret = -1; break; } @@ -435,29 +411,29 @@ static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out) } } } - free (pollfds); + return ret; } static int __open_subscribe_fd(pims_ipc_s *handle) { - // router inproc eventfd - int subscribe_fd = eventfd(0,0); - int flags; int ret; + int flags; + int subscribe_fd = eventfd(0, 0); if (-1 == subscribe_fd) { - ERROR("eventfd error : %d", errno); + ERR("eventfd error : %d", errno); return -1; } VERBOSE("subscribe :%d\n", subscribe_fd); - flags = fcntl (subscribe_fd, F_GETFL, 0); + flags = fcntl(subscribe_fd, F_GETFL, 0); if (flags == -1) flags = 0; - ret = fcntl (subscribe_fd, F_SETFL, flags | O_NONBLOCK); + + ret = fcntl(subscribe_fd, F_SETFL, flags | O_NONBLOCK); if (0 != ret) - VERBOSE("subscribe fcntl : %d\n", ret); + ERR("fcntl() Fail(%d)", errno); handle->subscribe_fd = subscribe_fd; return 0; @@ -474,7 +450,7 @@ static int __subscribe_data(pims_ipc_s * handle) do { /* read the size of message. note that ioctl is non-blocking */ if (ioctl(handle->fd, FIONREAD, &len)) { - ERROR("ioctl failed: %d", errno); + ERR("ioctl failed: %d", errno); break; } @@ -488,59 +464,54 @@ static int __subscribe_data(pims_ipc_s * handle) unsigned int read_len = 0; unsigned int total_len = 0; unsigned int call_id_len = 0; - unsigned int is_data = FALSE; + unsigned int has_data = FALSE; - // get total_len - read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int))); + read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len))); - // call_id - read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int))); + read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len))); if (call_id_len > 0 && call_id_len < UINT_MAX-1) { call_id = calloc(1, call_id_len+1); if (call_id == NULL) { - ERROR("calloc fail"); + ERR("calloc fail"); break; } - } - else + } else break; - ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len); - if (ret < 0) { ERROR("socket_recv error"); break; } + ret = socket_recv(handle->fd, (void *)&call_id, call_id_len); + if (ret < 0) { ERR("socket_recv error"); break; } read_len += ret; - // is_data - read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int))); + read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data))); - if (is_data) { + if (has_data) { unsigned int data_len; - read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int))); + read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len))); if (data_len > 0 && data_len < UINT_MAX-1) { buf = calloc(1, data_len+1); if (buf == NULL) { - ERROR("calloc fail"); + ERR("calloc fail"); break; } - } - else + } else break; - ret = socket_recv(handle->fd, (void *)&(buf), data_len); + ret = socket_recv(handle->fd, (void *)&buf, data_len); if (ret < 0) { - ERROR("socket_recv error"); + ERR("socket_recv error(%d)", ret); break; } read_len += ret; dhandle = pims_ipc_data_steal_unmarshal(buf, data_len); if (NULL == dhandle) { - ERROR("pims_ipc_data_steal_unmarshal() Fail"); + ERR("pims_ipc_data_steal_unmarshal() Fail"); break; } - buf = NULL; - pims_ipc_subscribe_data_s *sub_data = (pims_ipc_subscribe_data_s *)calloc(1, sizeof(pims_ipc_subscribe_data_s)); + pims_ipc_subscribe_data_s *sub_data; + sub_data = calloc(1, sizeof(pims_ipc_subscribe_data_s)); if (NULL == sub_data) { - ERROR("calloc() Fail"); + ERR("calloc() Fail"); pims_ipc_data_destroy(dhandle); ret = -1; break; @@ -555,7 +526,7 @@ static int __subscribe_data(pims_ipc_s * handle) write_command(handle->subscribe_fd, 1); } ret = 0; - } while(0); + } while (0); free(call_id); free(buf); @@ -567,7 +538,7 @@ static gboolean __hung_up_cb(gpointer data) GList *cursor = NULL; if (NULL == disconnected_list) { - DEBUG("No disconnected list"); + DBG("No disconnected list"); return FALSE; } @@ -576,7 +547,7 @@ static gboolean __hung_up_cb(gpointer data) while (cursor) { pims_ipc_server_disconnected_cb_t *disconnected = cursor->data; if (disconnected && disconnected->handle == data && disconnected->callback) { - DEBUG("call hung_up callback"); + DBG("call hung_up callback"); disconnected->callback(disconnected->user_data); break; } @@ -610,7 +581,6 @@ static void* __io_thread(void *data) int i = 0; pthread_mutex_lock(&__gmutex); - if (handle->epoll_stop_thread) { pthread_mutex_unlock(&__gmutex); break; @@ -630,7 +600,7 @@ static void* __io_thread(void *data) if (event_num == -1) { if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { - ERROR("errno:%d\n", errno); + ERR("errno:%d\n", errno); break; } } @@ -638,16 +608,16 @@ static void* __io_thread(void *data) pthread_mutex_lock(&__gmutex); for (i = 0; i < event_num; i++) { if (events[i].events & EPOLLHUP) { - ERROR("server fd closed"); - handle->epoll_stop_thread = true; + ERR("server fd closed"); + handle->epoll_stop_thread = TRUE; break; } if (events[i].events & EPOLLIN) { - if(__subscribe_data(handle) < 0) { - ERROR("server fd closed"); + if (__subscribe_data(handle) < 0) { + ERR("server fd closed"); g_idle_add(__hung_up_cb, handle); - handle->epoll_stop_thread = true; + handle->epoll_stop_thread = TRUE; break; } } @@ -663,18 +633,19 @@ static void* __io_thread(void *data) static gboolean _g_io_hup_cb(GIOChannel *src, GIOCondition condition, gpointer data) { if (G_IO_HUP & condition) { - DEBUG("hung up"); + DBG("hung up"); __hung_up_cb(data); return FALSE; + } else if (G_IO_IN & condition) { char buf[1] = {0}; if (0 == recv(((pims_ipc_s *)data)->fd, buf, sizeof(buf), MSG_PEEK)) { - DEBUG("hung up"); + DBG("hung up"); __hung_up_cb(data); return FALSE; } } else { - ERROR("Invalid condition (%d)", condition); + ERR("Invalid condition (%d)", condition); } return TRUE; } @@ -695,7 +666,7 @@ static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode) handle = g_new0(pims_ipc_s, 1); if (handle == NULL) { - ERROR("Failed to allocation"); + ERR("Failed to allocation"); break; } @@ -705,14 +676,15 @@ static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode) handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no()); handle->fd = socket(PF_UNIX, SOCK_STREAM, 0); if (handle->fd < 0) { - ERROR("socket error : %d, errno: %d", handle->fd, errno); + ERR("socket error : %d, errno: %d", handle->fd, errno); break; } - int flags = fcntl (handle->fd, F_GETFL, 0); + int flags = fcntl(handle->fd, F_GETFL, 0); if (flags == -1) flags = 0; - ret = fcntl (handle->fd, F_SETFL, flags | O_NONBLOCK); - VERBOSE("socket fcntl : %d\n", ret); + ret = fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK); + if (0 != ret) + ERR("fcntl() Fail(%d)", errno); pthread_mutex_init(&handle->call_status_mutex, 0); @@ -726,29 +698,30 @@ static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode) ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr)); if (ret != 0) { - ERROR("connect error : %d, errno: %d", ret, errno); + ERR("connect error : %d, errno: %d", ret, errno); break; } VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret); if (mode == PIMS_IPC_MODE_REQ) { GIOChannel *ch = g_io_channel_unix_new(handle->fd); - handle->disconnected_source = g_io_add_watch(ch, G_IO_IN|G_IO_HUP, _g_io_hup_cb, handle); + handle->disconnected_source = g_io_add_watch(ch, G_IO_IN|G_IO_HUP, + _g_io_hup_cb, handle); + g_io_channel_unref(ch); handle->call_sequence_no = (unsigned int)time(NULL); ret = __pims_ipc_send_identify(handle); if (ret < 0) { - ERROR("__pims_ipc_send_identify error"); + ERR("__pims_ipc_send_identify error"); break; } __pims_ipc_receive(handle, NULL); - if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) { - WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed"); - } - } - else { - handle->epoll_stop_thread = false; + if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) + WARN("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed"); + + } else { + handle->epoll_stop_thread = FALSE; pthread_mutex_init(&handle->data_queue_mutex, 0); pthread_mutex_lock(&handle->data_queue_mutex); @@ -767,22 +740,24 @@ static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode) GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd); if (!async_channel) { - ERROR("g_io_channel_unix_new error"); + ERR("g_io_channel_unix_new error"); break; } handle->async_channel = async_channel; - handle->async_source_id = g_io_add_watch(handle->async_channel, G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle); - handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free); + handle->async_source_id = g_io_add_watch(handle->async_channel, + G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle); + handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, + g_free, g_free); ASSERT(handle->subscribe_cb_table); - // add a subscriber handle to the global list + /* add a subscriber handle to the global list */ subscribe_handles = g_list_append(subscribe_handles, handle); VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles)); } is_ok = TRUE; VERBOSE("A new handle is created : %s, %s", handle->service, handle->id); - } while(0); + } while (0); pthread_mutex_unlock(&__gmutex); @@ -808,11 +783,12 @@ API pims_ipc_h pims_ipc_create_for_subscribe(char *service) static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode) { - pims_ipc_s *handle = (pims_ipc_s *)ipc; + pims_ipc_s *handle = ipc; if (mode == PIMS_IPC_MODE_REQ) { - if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0) { - WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed"); + if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, + NULL, NULL) != 0) { + WARN("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed"); } } @@ -833,71 +809,62 @@ API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc) static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in) { int ret = -1; - unsigned int sequence_no = 0; + int length = 0; + unsigned int total_len; + unsigned int seq_no = 0; gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function); unsigned int call_id_len = strlen(call_id); pims_ipc_data_s *data = NULL; - unsigned int is_data = FALSE; + unsigned int has_data = FALSE; unsigned int client_id_len = strlen(handle->id); - int length = 0; - GET_CALL_SEQUNECE_NO(handle, sequence_no); + GET_CALL_SEQUNECE_NO(handle, seq_no); - int len = sizeof(unsigned int) // total size - + client_id_len + sizeof(unsigned int) // client_id - + sizeof(unsigned int) // seq_no - + call_id_len + sizeof(unsigned int) // call_id - + sizeof(unsigned int); // is data - - int total_len = len; + int len = sizeof(total_len) + sizeof(client_id_len) + client_id_len + sizeof(seq_no) + + call_id_len + sizeof(call_id_len) + sizeof(has_data); + total_len = len; if (data_in) { - is_data = TRUE; - data = (pims_ipc_data_s*)data_in; + has_data = TRUE; + data = data_in; len += sizeof(unsigned int); total_len = len + data->buf_size; } - INFO("len : %d, client_id : %s, call_id : %s, seq_no :%d", len, handle->id, call_id, sequence_no); + INFO("len(%d),client_id(%s),call_id(%s),seq_no(%d)", len, handle->id, call_id, seq_no); char buf[len+1]; - memset(buf, 0x0, len+1); - memcpy(buf, (void*)&total_len, sizeof(unsigned int)); - length += sizeof(unsigned int); + memcpy(buf, &total_len, sizeof(total_len)); + length += sizeof(total_len); - // client_id client_id_len = strlen(handle->id); - memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int)); - length += sizeof(unsigned int); - memcpy(buf+length, (void*)(handle->id), client_id_len); + memcpy(buf+length, &client_id_len, sizeof(client_id_len)); + length += sizeof(client_id_len); + memcpy(buf+length, handle->id, client_id_len); length += client_id_len; - // seq_no - memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int)); - length += sizeof(unsigned int); + memcpy(buf+length, &seq_no, sizeof(seq_no)); + length += sizeof(seq_no); - // call id - memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int)); - length += sizeof(unsigned int); - memcpy(buf+length, (void*)(call_id), call_id_len); + memcpy(buf+length, &call_id_len, sizeof(call_id_len)); + length += sizeof(call_id_len); + memcpy(buf+length, call_id, call_id_len); length += call_id_len; g_free(call_id); - // is_data - memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int)); - length += sizeof(unsigned int); + memcpy(buf+length, &has_data, sizeof(has_data)); + length += sizeof(has_data); - if (is_data) { - memcpy(buf+length, (void*)&(data->buf_size), sizeof(unsigned int)); - length += sizeof(unsigned int); + if (has_data) { + memcpy(buf+length, &(data->buf_size), sizeof(data->buf_size)); + length += sizeof(data->buf_size); ret = socket_send(handle->fd, buf, length); if (ret > 0) ret = socket_send_data(handle->fd, data->buf, data->buf_size); - } - else { + } else { ret = socket_send(handle->fd, buf, length); } @@ -910,47 +877,37 @@ static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pim API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in, pims_ipc_data_h *data_out) { - pims_ipc_s *handle = (pims_ipc_s *)ipc; - - - if (ipc == NULL) { - ERROR("invalid handle : %p", ipc); - return -1; - } + pims_ipc_s *handle = ipc; - if (!module || !function) { - ERROR("invalid argument"); - return -1; - } + RETV_IF(NULL == ipc, -1); + RETV_IF(NULL == module, -1); + RETV_IF(NULL == function, -1); pthread_mutex_lock(&handle->call_status_mutex); if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) { pthread_mutex_unlock(&handle->call_status_mutex); - ERROR("the previous call is in progress : %p", ipc); + ERR("the previous call is in progress : %p", ipc); return -1; } pthread_mutex_unlock(&handle->call_status_mutex); - - if (__pims_ipc_send(handle, module, function, data_in) != 0) { + if (__pims_ipc_send(handle, module, function, data_in) != 0) return -1; - } - if (__pims_ipc_receive(handle, data_out) != 0) { + if (__pims_ipc_receive(handle, data_out) != 0) return -1; - } return 0; } static gboolean __call_async_idler_cb(gpointer data) { - VERBOSE(""); + pims_ipc_s *handle = data; + pims_ipc_data_h dhandle; - pims_ipc_s *handle = (pims_ipc_s *)data; - ASSERT(handle); - ASSERT(handle->dhandle_for_async_idler); - pims_ipc_data_h dhandle = handle->dhandle_for_async_idler; + RETV_IF(NULL == handle, FALSE); + + dhandle = handle->dhandle_for_async_idler; handle->dhandle_for_async_idler = NULL; pthread_mutex_lock(&handle->call_status_mutex); @@ -963,9 +920,10 @@ static gboolean __call_async_idler_cb(gpointer data) return FALSE; } -static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data) +static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, + gpointer data) { - pims_ipc_s *handle = (pims_ipc_s *)data; + pims_ipc_s *handle = data; pims_ipc_data_h dhandle = NULL; if (__pims_ipc_receive(handle, &dhandle) == 0) { @@ -975,19 +933,18 @@ static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition cond if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) { pthread_mutex_unlock(&handle->call_status_mutex); pims_ipc_data_destroy(dhandle); - } - else { + } else { pthread_mutex_unlock(&handle->call_status_mutex); - if (src == NULL) { // A response is arrived too quickly + if (src == NULL) { /* A response is arrived too quickly */ handle->dhandle_for_async_idler = dhandle; g_idle_add(__call_async_idler_cb, handle); - } - else { + } else { pthread_mutex_lock(&handle->call_status_mutex); handle->call_status = PIMS_IPC_CALL_STATUS_READY; pthread_mutex_unlock(&handle->call_status_mutex); - handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata); + handle->call_async_callback((pims_ipc_h)handle, dhandle, + handle->call_async_userdata); pims_ipc_data_destroy(dhandle); } } @@ -995,26 +952,21 @@ static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition cond return FALSE; } -API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in, - pims_ipc_call_async_cb callback, void *userdata) +API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, + pims_ipc_data_h data_in, pims_ipc_call_async_cb callback, void *user_data) { - pims_ipc_s *handle = (pims_ipc_s *)ipc; + pims_ipc_s *handle = ipc; guint source_id = 0; - if (ipc == NULL) { - ERROR("invalid handle : %p", ipc); - return -1; - } - - if (!module || !function || !callback) { - ERROR("invalid argument"); - return -1; - } + RETV_IF(NULL == ipc, -1); + RETV_IF(NULL == module, -1); + RETV_IF(NULL == function, -1); + RETV_IF(NULL == callback, -1); pthread_mutex_lock(&handle->call_status_mutex); if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) { pthread_mutex_unlock(&handle->call_status_mutex); - ERROR("the previous call is in progress : %p", ipc); + ERR("the previous call is in progress : %p", ipc); return -1; } pthread_mutex_unlock(&handle->call_status_mutex); @@ -1024,18 +976,19 @@ API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_i pthread_mutex_unlock(&handle->call_status_mutex); handle->call_async_callback = callback; - handle->call_async_userdata = userdata; + handle->call_async_userdata = user_data; - // add a callback for GIOChannel + /* add a callback for GIOChannel */ if (!handle->async_channel) { handle->async_channel = g_io_channel_unix_new(handle->fd); if (!handle->async_channel) { - ERROR("g_io_channel_unix_new error"); + ERR("g_io_channel_unix_new error"); return -1; } } - source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle); + source_id = g_io_add_watch(handle->async_channel, G_IO_IN, + __pims_ipc_call_async_handler, handle); handle->async_source_id = source_id; if (__pims_ipc_send(handle, module, function, data_in) != 0) { @@ -1048,47 +1001,41 @@ API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_i return 0; } -API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc) +API int pims_ipc_is_call_in_progress(pims_ipc_h ipc) { int ret; - pims_ipc_s *handle = (pims_ipc_s *)ipc; + pims_ipc_s *handle = ipc; - if (ipc == NULL) { - ERROR("invalid handle : %p", ipc); - return false; - } + RETV_IF(NULL == ipc, FALSE); pthread_mutex_lock(&handle->call_status_mutex); if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS) - ret = true; + ret = TRUE; else - ret = false; + ret = FALSE; pthread_mutex_unlock(&handle->call_status_mutex); return ret; } -API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata) +API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, + pims_ipc_subscribe_cb callback, void *user_data) { gchar *call_id = NULL; + pims_ipc_s *handle = ipc; pims_ipc_cb_s *cb_data = NULL; - pims_ipc_s *handle = (pims_ipc_s *)ipc; - if (ipc == NULL || handle->subscribe_cb_table == NULL) { - ERROR("invalid handle : %p", ipc); - return -1; - } - - if (!module || !event || !callback) { - ERROR("invalid argument"); - return -1; - } + RETV_IF(NULL == ipc, -1); + RETV_IF(NULL == module, -1); + RETV_IF(NULL == event, -1); + RETV_IF(NULL == callback, -1); + RETV_IF(NULL == handle->subscribe_cb_table, -1); cb_data = g_new0(pims_ipc_cb_s, 1); call_id = PIMS_IPC_MAKE_CALL_ID(module, event); VERBOSE("subscribe cb id[%s]", call_id); cb_data->callback = callback; - cb_data->user_data = userdata; + cb_data->user_data = user_data; g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data); return 0; @@ -1097,24 +1044,19 @@ API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_s API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event) { gchar *call_id = NULL; - pims_ipc_s *handle = (pims_ipc_s *)ipc; + pims_ipc_s *handle = ipc; - if (ipc == NULL || handle->subscribe_cb_table == NULL) { - ERROR("invalid handle : %p", ipc); - return -1; - } - - if (!module || !event) { - ERROR("invalid argument"); - return -1; - } + RETV_IF(NULL == ipc, -1); + RETV_IF(NULL == module, -1); + RETV_IF(NULL == event, -1); + RETV_IF(NULL == handle->subscribe_cb_table, -1); call_id = PIMS_IPC_MAKE_CALL_ID(module, event); VERBOSE("unsubscribe cb id[%s]", call_id); if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) { - ERROR("g_hash_table_remove error"); + ERR("g_hash_table_remove error"); g_free(call_id); return -1; } @@ -1123,7 +1065,8 @@ API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event) return 0; } -API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle, pims_ipc_server_disconnected_cb callback, void *user_data) +API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle, + pims_ipc_server_disconnected_cb callback, void *user_data) { GList *cursor = NULL; @@ -1133,7 +1076,7 @@ API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle, pims_ipc_server_d while (cursor) { pims_ipc_server_disconnected_cb_t *disconnected = cursor->data; if (disconnected && disconnected->handle == handle) { - ERROR("Already set callback"); + ERR("Already set callback"); pthread_mutex_unlock(&__disconnect_cb_mutex); return -1; } @@ -1145,10 +1088,10 @@ API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle, pims_ipc_server_d pims_ipc_server_disconnected_cb_t *disconnected = NULL; disconnected = calloc(1, sizeof(pims_ipc_server_disconnected_cb_t)); if (NULL == disconnected) { - ERROR("Calloc() Fail"); + ERR("calloc() Fail"); return -1; } - DEBUG("add disconnected"); + DBG("add disconnected"); disconnected->handle = handle; disconnected->callback = callback; disconnected->user_data = user_data; @@ -1171,7 +1114,7 @@ API int pims_ipc_remove_server_disconnected_cb(pims_ipc_h handle) if (disconnected && disconnected->handle == handle) { free(disconnected); disconnected_list = g_list_delete_link(disconnected_list, cursor); - DEBUG("remove disconnected_cb"); + DBG("remove disconnected_cb"); break; } cursor = g_list_next(cursor); @@ -1191,7 +1134,8 @@ API int pims_ipc_unset_server_disconnected_cb() return 0; } -API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback, void *user_data) +API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback, + void *user_data) { pthread_mutex_lock(&__disconnect_cb_mutex); _server_disconnected_cb.callback = callback; diff --git a/src/pims-socket.c b/src/pims-socket.c index 6530945..db41d15 100644 --- a/src/pims-socket.c +++ b/src/pims-socket.c @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -18,55 +18,125 @@ #include #include +#include #include #include #include +#include #include +#include #include +#include +#include #include "pims-internal.h" -#include "pims-debug.h" +#include "pims-ipc-data-internal.h" +#include "pims-ipc-worker.h" +#include "pims-ipc-utils.h" #include "pims-socket.h" -#define MAX_ARRAY_LEN 65535 +#define MAX_ARRAY_LEN 65535 +#define PIMS_WAIT_MSEC 1000 -int socket_send(int fd, char *buf, int len) +static int _get_pid(int fd) { - if (!buf || len <= 0) { - INFO("No data to send %p, %d", buf, len); + int ret = 0; + struct ucred uc = {0}; + socklen_t uc_len = sizeof(uc); + ret = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &uc_len); + if (ret < 0) { + ERR("getsockopt() Fail(%d)", errno); return -1; } + return uc.pid; +} +static bool _is_send_block(int fd) +{ + int ret = 0; + int queue_size = 0; + ioctl(fd, TIOCOUTQ, &queue_size); + + int buf_size = 0; + int rn = sizeof(int); + ret = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf_size, (socklen_t *)&rn); + if (ret < 0) { + ERR("getsockopt() Fail(%d)", errno); + DBG("remain size(%d)", queue_size); + return false; + } + if (buf_size < queue_size) { + DBG("send : buffer size(%d) < queue size(%d)", buf_size, queue_size); + return true; + } + return false; +} + +static int _sub_timespec_in_msec(struct timespec *st) +{ + struct timespec et = {0}; + clock_gettime(CLOCK_REALTIME, &et); + + /* 3 digits for sec, 3 digits for msec */ + int s_msec = ((st->tv_sec % 1000) * 1000) + (st->tv_nsec / 1000000); + int e_msec = ((et.tv_sec % 1000) * 1000) + (et.tv_nsec / 1000000); + return e_msec - s_msec; +} + +int socket_send(int fd, char *buf, int len) +{ int length = len; int passed_len = 0; int write_len = 0; + bool retry = false; + struct timespec st = {0}; + + RETV_IF(NULL == buf, -1); + RETVM_IF(len <= 0, -1, "Invalid length(%d)", len); + while (length > 0) { - passed_len = send(fd, (const void *)buf, length, MSG_NOSIGNAL); + passed_len = send(fd, (const void *)buf, length, MSG_DONTWAIT | MSG_NOSIGNAL); if (passed_len == -1) { - if (errno == EINTR){ - ERROR("EINTR error. send retry"); + if (errno == EINTR) { + ERR("EINTR error. send retry"); continue; - } - else if (errno == EAGAIN) { - ERROR("EAGAIN error. send retry"); + } else if (errno == EAGAIN) { /* same as EWOULDBLOCK */ + if (false == retry) { + clock_gettime(CLOCK_REALTIME, &st); + retry = true; + } else { + int diff_msec = _sub_timespec_in_msec(&st); + if (PIMS_WAIT_MSEC < diff_msec) { + ERR("EAGAIN error. send retry"); + DBG("send timestamp (%d.%d)sec and wait (%d)msec", st.tv_sec, st.tv_nsec, diff_msec); + int pid = _get_pid(fd); + if (true == _is_send_block(fd)) { + DBG("send blocked. kill fd(%d) pid(%d)", fd, pid); + return -1; + } else { + DBG("reset timeout and wait (%d)msec", PIMS_WAIT_MSEC); + clock_gettime(CLOCK_REALTIME, &st); + } + } + } continue; } - else if (errno == EWOULDBLOCK) { - ERROR("EWOULDBLOCK error. send retry"); - continue; - } - ERROR("send error [%d]", errno); + ERR("send error [%d]", errno); break; - } else if (passed_len == 0) + } else if (passed_len == 0) { break; + } + length -= passed_len; buf += passed_len; + + retry = false; } write_len = len - length; if (write_len != len) { - WARNING("WARNING: buf_size [%d] != write_len[%d]", len, write_len); + WARN("WARN: buf_size [%d] != write_len[%d]", len, write_len); return -1; } VERBOSE("write_len [%d]", write_len); @@ -76,16 +146,13 @@ int socket_send(int fd, char *buf, int len) int socket_recv(int fd, void **buf, unsigned int len) { - if (!buf) { - INFO("Buffer must not null"); - return -1; - } - unsigned int length = len; int read_len = 0; int final_len = 0; char *temp = *buf; + RETV_IF(NULL == *buf, -1); + while (length > 0) { read_len = read(fd, (void *)temp, length); if (read_len < 0) { @@ -96,12 +163,12 @@ int socket_recv(int fd, void **buf, unsigned int len) else if (errno == EWOULDBLOCK) continue; else if (errno == EPIPE) { - ERROR("connection closed : read err %d", errno, read_len, length); + ERR("connection closed : read err %d", errno, read_len, length); free(*buf); *buf = NULL; return 0; /* connection closed */ } - ERROR("read err %d, read_len :%d, length : %d", errno, read_len, length); + ERR("read err %d, read_len :%d, length : %d", errno, read_len, length); final_len = read_len; break; } else if (read_len == 0) @@ -115,11 +182,11 @@ int socket_recv(int fd, void **buf, unsigned int len) final_len = (len-length); if (len != final_len) { - WARNING("WARNING: buf_size [%d] != read_len[%d]\n", read_len, final_len); + WARN("WARN: buf_size [%d] != read_len[%d]\n", read_len, final_len); return -1; } - ((char*)*buf)[len]= '\0'; + ((char*)*buf)[len] = '\0'; return final_len; } @@ -140,7 +207,7 @@ int socket_send_data(int fd, char *buf, unsigned int len) ret = socket_send(fd, (buf+send_len), remain_len); if (ret < 0) { - ERROR("socket_send error"); + ERR("socket_send error"); break; } send_len += ret; @@ -148,7 +215,7 @@ int socket_send_data(int fd, char *buf, unsigned int len) } if (ret < 0) { - ERROR("socket_send error"); + ERR("socket_send error"); return -1; } @@ -157,11 +224,10 @@ int socket_send_data(int fd, char *buf, unsigned int len) int write_command(int fd, const uint64_t cmd) { - // poll : Level Trigger uint64_t clear_cmd = 0; int ret = write(fd, &clear_cmd, sizeof(clear_cmd)); if (ret < 0) - ERROR("write fail (%d)", ret); + ERR("write fail (%d)", ret); return write(fd, &cmd, sizeof(cmd)); } @@ -170,9 +236,368 @@ int read_command(int fd, uint64_t *cmd) { uint64_t dummy; int len = TEMP_FAILURE_RETRY(read(fd, &dummy, sizeof(dummy))); - if (len == sizeof(dummy)) { + if (len == sizeof(dummy)) *cmd = dummy; - } + return len; } + +/* + * if delete = TRUE, steal client_id, then free(client_id) + * if delete = FALSE, return client_id pointer, then do no call free(client_id + */ +static char* __find_client_id(pims_ipc_svc_s *ipc_svc, int client_fd, int delete) +{ + char *client_id; + GList *cursor = NULL; + pims_ipc_client_map_s *client; + + cursor = g_list_first(ipc_svc->client_id_fd_map); + while (cursor) { + client = cursor->data; + if (client && client->fd == client_fd) { + client_id = client->id; + if (delete) { + client->id = NULL; + ipc_svc->client_id_fd_map = g_list_delete_link(ipc_svc->client_id_fd_map, + cursor); + free(client); + } + return client_id; + } + cursor = cursor->next; + } + return NULL; +} + + +static int __send_identify(int fd, unsigned int seq_no, char *id, int id_len) +{ + int total_len, length = 0; + + total_len = sizeof(total_len) + sizeof(id_len) + id_len + sizeof(seq_no); + + char buf[total_len+1]; + memset(buf, 0x0, total_len+1); + + memcpy(buf, &total_len, sizeof(total_len)); + length += sizeof(total_len); + + memcpy(buf+length, &id_len, sizeof(id_len)); + length += sizeof(id_len); + memcpy(buf+length, id, id_len); + length += id_len; + + memcpy(buf+length, &(seq_no), sizeof(seq_no)); + length += sizeof(seq_no); + + return socket_send(fd, buf, length); +} + +static int __recv_raw_data(int fd, pims_ipc_raw_data_s **data, int *init) +{ + int len = 0; + pims_ipc_raw_data_s *temp; + + /* read the size of message. note that ioctl is non-blocking */ + if (ioctl(fd, FIONREAD, &len)) { + ERR("ioctl() Fail(%d)", errno); + return -1; + } + + /* when server or client closed socket */ + if (len == 0) { + INFO("[IPC Socket] connection is closed"); + return 0; + } + + temp = calloc(1, sizeof(pims_ipc_raw_data_s)); + if (NULL == temp) { + ERR("calloc() Fail(%d)", errno); + return -1; + } + temp->client_id = NULL; + temp->client_id_len = 0; + temp->call_id = NULL; + temp->call_id_len = 0; + temp->seq_no = 0; + temp->has_data = FALSE; + temp->data = NULL; + temp->data_len = 0; + + int ret = 0; + int read_len = 0; + unsigned int total_len = 0; + unsigned int has_data = FALSE; + + do { + ret = TEMP_FAILURE_RETRY(read(fd, &total_len, sizeof(total_len))); + if (-1 == ret) { + ERR("read() Fail(%d)", errno); + break; + } + read_len += ret; + + ret = TEMP_FAILURE_RETRY(read(fd, &(temp->client_id_len), sizeof(temp->client_id_len))); + if (-1 == ret) { + ERR("read() Fail(%d)", errno); + break; + } + read_len += ret; + + temp->client_id = calloc(1, temp->client_id_len+1); + if (NULL == temp->client_id) { + ERR("calloc() Fail"); + return -1; + } + ret = socket_recv(fd, (void *)&(temp->client_id), temp->client_id_len); + if (ret < 0) { + ERR("socket_recv() Fail(%d)", ret); + break; + } + read_len += ret; + + ret = TEMP_FAILURE_RETRY(read(fd, &(temp->seq_no), sizeof(temp->seq_no))); + if (ret < 0) { + ERR("read() Fail(%d)", ret); + break; + } + read_len += ret; + + if (total_len == read_len) { + *data = temp; + *init = TRUE; + return read_len; + } + + ret = TEMP_FAILURE_RETRY(read(fd, &(temp->call_id_len), sizeof(temp->call_id_len))); + if (ret < 0) { + ERR("read() Fail(%d)", errno); + break; + } + read_len += ret; + + temp->call_id = calloc(1, temp->call_id_len+1); + ret = socket_recv(fd, (void *)&(temp->call_id), temp->call_id_len); + if (ret < 0) { + ERR("socket_recv() Fail(%d)", ret); + break; + } + read_len += ret; + + ret = TEMP_FAILURE_RETRY(read(fd, &has_data, sizeof(has_data))); + if (ret < 0) { + ERR("read() Fail(%d)", errno); + break; + } + read_len += ret; + + if (has_data) { + temp->has_data = TRUE; + ret = TEMP_FAILURE_RETRY(read(fd, &(temp->data_len), sizeof(temp->data_len))); + if (ret < 0) { + ERR("read() Fail(%d)", errno); + break; + } + read_len += ret; + + temp->data = calloc(1, temp->data_len+1); + ret = socket_recv(fd, (void *)&(temp->data), temp->data_len); + if (ret < 0) { + ERR("socket_recv() Fail"); + break; + } + read_len += ret; + } + + INFO("client_id : %s, call_id : %s, seq_no : %d", temp->client_id, temp->call_id, + temp->seq_no); + + *data = temp; + *init = FALSE; + } while (0); + + if (ret < 0) { + ERR("total_len(%d) client_id_len(%d)", total_len, temp->client_id_len); + worker_free_raw_data(temp); + *data = NULL; + *init = FALSE; + return -1; + } + + return read_len; +} + +static gboolean __process_init_request(int client_fd, pims_ipc_raw_data_s *req, + pims_ipc_svc_s *ipc_svc) +{ + int ret; + pims_ipc_client_map_s *client; + + client = calloc(1, sizeof(pims_ipc_client_map_s)); + if (NULL == client) { + ERR("calloc() Fail(%d)", errno); + return FALSE; + } + client->fd = client_fd; + client->id = req->client_id; + + req->client_id = NULL; + ipc_svc->client_id_fd_map = g_list_append(ipc_svc->client_id_fd_map, client); + + worker_start_idle_worker(ipc_svc); + + /* send server pid to client */ + char temp[100]; + snprintf(temp, sizeof(temp), "%d_%x", client_get_unique_sequence_number(), getpid()); + ret = __send_identify(client_fd, req->seq_no, temp, strlen(temp)); + + worker_free_raw_data(req); + if (-1 == ret) { + ERR("__send_identify() Fail"); + return FALSE; + } + + return TRUE; +} + +static gboolean __process_request(int client_fd, pims_ipc_raw_data_s *req, + pims_ipc_svc_s *ipc_svc) +{ + char *client_id = NULL; + pims_ipc_worker_data_s *worker_data; + + client_id = __find_client_id(ipc_svc, client_fd, FALSE); + if (NULL == client_id) { + ERR("__find_client_id(%d) Fail", client_fd); + return FALSE; + } + + if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_CREATE, req->call_id)) { + worker_data = worker_get_idle_worker(ipc_svc, client_id); + if (NULL == worker_data) { + ERR("worker_get_idle_worker() Fail"); + return FALSE; + } + if (!worker_data->fd) { + int ret = worker_wait_idle_worker_ready(worker_data); + if (ret < 0) + return FALSE; + } + } else { + worker_data = worker_find(ipc_svc, client_id); + } + + if (worker_data) { + worker_push_raw_data(worker_data, client_fd, req); + write_command(worker_data->fd, 1); + } else { + ERR("worker_find(%s) Fail[client_fd(%d)]", client_id, client_fd); + } + return TRUE; +} + +static gboolean __request_handler(GIOChannel *src, GIOCondition condition, gpointer data) +{ + int client_fd; + char *client_id = NULL; + pims_ipc_svc_s *ipc_svc = data; + + RETV_IF(NULL == data, FALSE); + + client_fd = g_io_channel_unix_get_fd(src); + + if (G_IO_HUP & condition) { + INFO("client closed: client_fd(%d)", client_fd); + /* Find client_id */ + client_id = __find_client_id(ipc_svc, client_fd, TRUE); + if (client_id) { + worker_stop_client_worker(ipc_svc, client_id); + free(client_id); + } + + close(client_fd); + return FALSE; + } + + /* receive data from client */ + int recv_len; + int init = FALSE; + pims_ipc_raw_data_s *req = NULL; + + recv_len = __recv_raw_data(client_fd, &req, &init); + if (0 < recv_len) { + if (init) + return __process_init_request(client_fd, req, ipc_svc); + + return __process_request(client_fd, req, ipc_svc); + } else { + ERR("receive invalid : %d", client_fd); + close(client_fd); + return FALSE; + } +} + + +static gboolean __socket_handler(GIOChannel *src, GIOCondition condition, gpointer data) +{ + int sockfd; + GIOChannel *channel; + int client_sockfd = -1; + struct sockaddr_un clientaddr; + socklen_t client_len = sizeof(clientaddr); + pims_ipc_svc_s *ipc_svc = data; + + sockfd = ipc_svc->sockfd; + + client_sockfd = accept(sockfd, (struct sockaddr *)&clientaddr, &client_len); + if (-1 == client_sockfd) { + char *errmsg = NULL; + char buf[1024] = {0}; + errmsg = strerror_r(errno, buf, sizeof(buf)); + if (errmsg) + ERR("accept error : %s", errmsg); + + return TRUE; + } + + channel = g_io_channel_unix_new(client_sockfd); + g_io_add_watch(channel, G_IO_IN|G_IO_HUP, __request_handler, data); + g_io_channel_unref(channel); + + return TRUE; +} + +void socket_set_handler(void *user_data) +{ + int ret; + struct sockaddr_un addr; + GIOChannel *gio = NULL; + pims_ipc_svc_s *ipc_svc = user_data; + + ret = sd_is_socket_unix(SD_LISTEN_FDS_START, SOCK_STREAM, -1, ipc_svc->service, 0); + if (sd_listen_fds(1) == 1 && 0 < ret) { + ipc_svc->sockfd = SD_LISTEN_FDS_START; + } else { + unlink(ipc_svc->service); + ipc_svc->sockfd = socket(PF_UNIX, SOCK_STREAM, 0); + + bzero(&addr, sizeof(addr)); + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service); + + ret = bind(ipc_svc->sockfd, (struct sockaddr *)&addr, sizeof(addr)); + if (ret != 0) + ERR("bind() Fail(%d)", errno); + ret = listen(ipc_svc->sockfd, 30); + + ret = chown(ipc_svc->service, getuid(), ipc_svc->group); + ret = chmod(ipc_svc->service, ipc_svc->mode); + } + + gio = g_io_channel_unix_new(ipc_svc->sockfd); + + g_io_add_watch(gio, G_IO_IN, __socket_handler, ipc_svc); +} + diff --git a/src/pims-socket.h b/src/pims-socket.h index d04d1d7..523a517 100644 --- a/src/pims-socket.h +++ b/src/pims-socket.h @@ -1,7 +1,7 @@ /* * PIMS IPC * - * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. @@ -16,20 +16,30 @@ * limitations under the License. */ - #ifndef __PIMS_SOCKET_H__ #define __PIMS_SOCKET_H__ -#include #include -#include -#include + +#include "pims-ipc-data-internal.h" #ifdef __cplusplus extern "C" { #endif +typedef struct { + int client_fd; + int request_count; + GList *raw_data; /* pims_ipc_raw_data_s list */ + pthread_mutex_t raw_data_mutex; +} pims_ipc_request_s; + +typedef struct { + int fd; + char *id; +} pims_ipc_client_map_s; + int socket_send(int fd, char *buf, int len); int socket_recv(int fd, void **buf, unsigned int len); int socket_send_data(int fd, char *buf, unsigned int len); @@ -37,6 +47,9 @@ int socket_send_data(int fd, char *buf, unsigned int len); int write_command(int fd, const uint64_t cmd); int read_command(int fd, uint64_t *cmd); +void socket_remove_client_fd_map(pims_ipc_svc_s *ipc_svc); +void socket_set_handler(void *user_data); + #ifdef __cplusplus } #endif -- 2.7.4