From e3ba65ad7b66324f97f01a5061d1d7df818e31a1 Mon Sep 17 00:00:00 2001 From: Jinkun Jang Date: Wed, 13 Mar 2013 01:51:21 +0900 Subject: [PATCH] Tizen 2.1 base --- CMakeLists.txt | 54 ++ LICENSE.APLv2 | 204 ++++++++ NOTICE | 1 + include/pims-ipc-data.h | 52 ++ include/pims-ipc-svc.h | 46 ++ include/pims-ipc-types.h | 63 +++ include/pims-ipc.h | 48 ++ packaging/pims-ipc.spec | 59 +++ pims-ipc.manifest | 12 + pims-ipc.pc.in | 13 + src/pims-debug.h | 83 ++++ src/pims-internal.h | 77 +++ src/pims-ipc-data.c | 503 +++++++++++++++++++ src/pims-ipc-svc.c | 1242 ++++++++++++++++++++++++++++++++++++++++++++++ src/pims-ipc.c | 813 ++++++++++++++++++++++++++++++ src/pims-socket.c | 286 +++++++++++ src/pims-socket.h | 42 ++ test/CMakeLists.txt | 44 ++ test/sock-test.c | 178 +++++++ test/test.c | 586 ++++++++++++++++++++++ 20 files changed, 4406 insertions(+) create mode 100755 CMakeLists.txt create mode 100644 LICENSE.APLv2 create mode 100644 NOTICE create mode 100644 include/pims-ipc-data.h create mode 100644 include/pims-ipc-svc.h create mode 100644 include/pims-ipc-types.h create mode 100644 include/pims-ipc.h create mode 100644 packaging/pims-ipc.spec create mode 100644 pims-ipc.manifest create mode 100755 pims-ipc.pc.in create mode 100644 src/pims-debug.h create mode 100644 src/pims-internal.h create mode 100644 src/pims-ipc-data.c create mode 100644 src/pims-ipc-svc.c create mode 100644 src/pims-ipc.c create mode 100644 src/pims-socket.c create mode 100644 src/pims-socket.h create mode 100755 test/CMakeLists.txt create mode 100644 test/sock-test.c create mode 100644 test/test.c diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100755 index 0000000..4041dfc --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,54 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.6) +PROJECT(pims-ipc C) + +#IF("${CMAKE_BUILD_TYPE}" STREQUAL "") +# SET(CMAKE_BUILD_TYPE "Release") +#ENDIF("${CMAKE_BUILD_TYPE}" STREQUAL "") +#MESSAGE("Build type: ${CMAKE_BUILD_TYPE}") +#SET(CMAKE_BUILD_TYPE "Debug") + +SET(DEST_INCLUDE_DIR "include/pims-ipc") +SET(SRC_INCLUDE_DIR "${CMAKE_SOURCE_DIR}/include") + +SET(PREFIX ${CMAKE_INSTALL_PREFIX}) +SET(EXEC_PREFIX "\${prefix}") +SET(LIBDIR "\${prefix}/lib") +SET(INCLUDEDIR "\${prefix}/${DEST_INCLUDE_DIR}") +SET(VERSION_MAJOR 0) +SET(VERSION "${VERSION_MAJOR}.0.1") + +INCLUDE_DIRECTORIES(${SRC_INCLUDE_DIR}) +#SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} -I${CMAKE_SOURCE_DIR}/src -I${SRC_INCLUDE_DIR} -D_NON_SLP") +SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} -I${CMAKE_SOURCE_DIR}/src -I${SRC_INCLUDE_DIR}") + +FILE(GLOB SRCS src/*.c) + +INCLUDE(FindPkgConfig) +#pkg_check_modules(pkgs REQUIRED glib-2.0 gthread-2.0 libzmq) +pkg_check_modules(pkgs REQUIRED glib-2.0 gthread-2.0 dlog libsystemd-daemon libzmq) + +FOREACH(flag ${pkgs_CFLAGS}) + SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} ${flag}") +ENDFOREACH(flag) + +SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} -fvisibility=hidden") + +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${EXTRA_CFLAGS}") + +ADD_DEFINITIONS("-DPREFIX=\"${PREFIX}\"") + +ADD_LIBRARY(${PROJECT_NAME} SHARED ${SRCS}) +SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES SOVERSION ${VERSION_MAJOR}) +SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES VERSION ${VERSION}) +TARGET_LINK_LIBRARIES(${PROJECT_NAME} ${pkgs_LDFLAGS}) + +CONFIGURE_FILE(${PROJECT_NAME}.pc.in ${PROJECT_NAME}.pc @ONLY) +SET_DIRECTORY_PROPERTIES(PROPERTIES ADDITIONAL_MAKE_CLEAN_FILES "${PROJECT_NAME}.pc") + +INSTALL(TARGETS ${PROJECT_NAME} DESTINATION lib) +INSTALL(FILES ${PROJECT_NAME}.pc DESTINATION lib/pkgconfig) + +FILE(GLOB HEADER_FILES ${SRC_INCLUDE_DIR}/*.h) +INSTALL(FILES ${HEADER_FILES} DESTINATION ${DEST_INCLUDE_DIR}) + +ADD_SUBDIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/test) diff --git a/LICENSE.APLv2 b/LICENSE.APLv2 new file mode 100644 index 0000000..bae7f54 --- /dev/null +++ b/LICENSE.APLv2 @@ -0,0 +1,204 @@ +Copyright (c) 2000 - 2012 Samsung Electronics Co., Ltd. All rights reserved. + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..4c49449 --- /dev/null +++ b/NOTICE @@ -0,0 +1 @@ +Copyright (c) 2000 - 2012 Samsung Electronics Co., Ltd. All rights reserved. diff --git a/include/pims-ipc-data.h b/include/pims-ipc-data.h new file mode 100644 index 0000000..e3bc131 --- /dev/null +++ b/include/pims-ipc-data.h @@ -0,0 +1,52 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __PIMS_IPC_DATA_H__ +#define __PIMS_IPC_DATA_H__ + +#include + +#ifdef _cplusplus +extern "C" +{ +#endif + +#define pims_ipc_data_create(flags) pims_ipc_data_create_with_size(1024, (flags)) + +pims_ipc_data_h pims_ipc_data_create_with_size(unsigned int size, int flags); +void pims_ipc_data_destroy(pims_ipc_data_h ipc); +int pims_ipc_data_put(pims_ipc_data_h data, void *buf, unsigned int size); +void* pims_ipc_data_get(pims_ipc_data_h data, unsigned int *size); +void* pims_ipc_data_get_dup(pims_ipc_data_h data, unsigned int *size); +int pims_ipc_data_put_with_type(pims_ipc_data_h data, pims_ipc_data_type_e type, void *buf, unsigned int size); +void* pims_ipc_data_get_with_type(pims_ipc_data_h data, pims_ipc_data_type_e *type, unsigned int *size); +void* pims_ipc_data_get_dup_with_type(pims_ipc_data_h data, pims_ipc_data_type_e *type, unsigned int *size); + +void* pims_ipc_data_marshal(pims_ipc_data_h data, unsigned int *size); +int pims_ipc_data_marshal_with_zmq(pims_ipc_data_h data, zmq_msg_t *pzmsg); +void* pims_ipc_data_marshal_dup(pims_ipc_data_h data, unsigned int *size); +pims_ipc_data_h pims_ipc_data_unmarshal(void *buf, unsigned int size); +pims_ipc_data_h pims_ipc_data_unmarshal_with_zmq(zmq_msg_t *pzmsg); +pims_ipc_data_h pims_ipc_data_unmarshal_dup(void *buf, unsigned int size); + +#ifdef _cplusplus +} +#endif + +#endif /* __PIMS_IPC_DATA_H__ */ diff --git a/include/pims-ipc-svc.h b/include/pims-ipc-svc.h new file mode 100644 index 0000000..0f85bb8 --- /dev/null +++ b/include/pims-ipc-svc.h @@ -0,0 +1,46 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __PIMS_IPC_SVC_H__ +#define __PIMS_IPC_SVC_H__ + +#include +#include + +#ifdef _cplusplus +extern "C" +{ +#endif + +int pims_ipc_svc_init(char *service, gid_t group, mode_t mode); +int pims_ipc_svc_deinit(void); +int pims_ipc_svc_register(char *module, char *function, pims_ipc_svc_call_cb callback, void *userdata); + +int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode); +int pims_ipc_svc_deinit_for_publish(void); +int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data); + +void pims_ipc_svc_run_main_loop(GMainLoop* main_loop); + +#ifdef _cplusplus +} +#endif + +#endif /*__PIMS_IPC_IMPL_H__*/ + diff --git a/include/pims-ipc-types.h b/include/pims-ipc-types.h new file mode 100644 index 0000000..3244c34 --- /dev/null +++ b/include/pims-ipc-types.h @@ -0,0 +1,63 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __PIMS_IPC_TYPES_H__ +#define __PIMS_IPC_TYPES_H__ + +#include +#include +#include + + +#ifdef _cplusplus +extern "C" +{ +#endif + +#define PIMS_IPC_DATA_FLAGS_NONE 0x00000000 +#define PIMS_IPC_DATA_FLAGS_WITH_TYPE 0x00000001 + +typedef void* pims_ipc_h; +typedef void* pims_ipc_data_h; + +typedef enum { + PIMS_IPC_DATA_TYPE_INVALID, + PIMS_IPC_DATA_TYPE_CHAR, + PIMS_IPC_DATA_TYPE_UCHAR, + PIMS_IPC_DATA_TYPE_INT, + PIMS_IPC_DATA_TYPE_UINT, + PIMS_IPC_DATA_TYPE_LONG, + PIMS_IPC_DATA_TYPE_ULONG, + PIMS_IPC_DATA_TYPE_FLOAT, + PIMS_IPC_DATA_TYPE_DOUBLE, + PIMS_IPC_DATA_TYPE_STRING, + PIMS_IPC_DATA_TYPE_MEMORY, +} pims_ipc_data_type_e; + +typedef void (*pims_ipc_svc_call_cb)(pims_ipc_h ipc, pims_ipc_data_h data_in, + pims_ipc_data_h *data_out, void *userdata); +typedef void (*pims_ipc_call_async_cb)(pims_ipc_h ipc, pims_ipc_data_h data_out, void *userdata); +typedef void (*pims_ipc_subscribe_cb)(pims_ipc_h ipc, pims_ipc_data_h data, void *userdata); + +#ifdef _cplusplus +} +#endif + +#endif /* __PIMS_IPC_TYPES_H__ */ + diff --git a/include/pims-ipc.h b/include/pims-ipc.h new file mode 100644 index 0000000..f192ec9 --- /dev/null +++ b/include/pims-ipc.h @@ -0,0 +1,48 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __PIMS_IPC_H__ +#define __PIMS_IPC_H__ + +#include + +#ifdef _cplusplus +extern "C" +{ +#endif + +pims_ipc_h pims_ipc_create(char *service); +void pims_ipc_destroy(pims_ipc_h ipc); +int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in, + pims_ipc_data_h *data_out); +int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in, + pims_ipc_call_async_cb callback, void *userdata); +bool pims_ipc_is_call_in_progress(pims_ipc_h ipc); + +pims_ipc_h pims_ipc_create_for_subscribe(char *service); +void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc); +int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata); +int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event); + +#ifdef _cplusplus +} +#endif + +#endif /*__PIMS_IPC_H__*/ + diff --git a/packaging/pims-ipc.spec b/packaging/pims-ipc.spec new file mode 100644 index 0000000..9cc428d --- /dev/null +++ b/packaging/pims-ipc.spec @@ -0,0 +1,59 @@ +Name: pims-ipc +Summary: library for PIMs IPC +Version: 0.0.20 +Release: 1 +Group: System/Libraries +License: Apache 2.0 +Source0: %{name}-%{version}.tar.gz +Requires(post): /sbin/ldconfig +Requires(post): /usr/bin/sqlite3 +Requires(postun): /sbin/ldconfig + +BuildRequires: cmake +BuildRequires: pkgconfig(glib-2.0) +BuildRequires: pkgconfig(dlog) +BuildRequires: pkgconfig(libsystemd-daemon) +BuildRequires: pkgconfig(libzmq) + +%description +library for PIMs IPC + +%package devel +Summary: DB library for calendar +Group: Development/Libraries +Requires: %{name} = %{version}-%{release} + +%description devel +library for PIMs IPC (developement files) + +%prep +%setup -q + + +%build +cmake . -DCMAKE_INSTALL_PREFIX=%{_prefix} + + +make %{?jobs:-j%jobs} + +%install +%make_install + + +%post +/sbin/ldconfig + +%postun -p /sbin/ldconfig + + +%files +%manifest pims-ipc.manifest +%defattr(-,root,root,-) +%{_libdir}/libpims-ipc.so.* + +%files devel +%defattr(-,root,root,-) +%{_includedir}/pims-ipc/*.h +%{_libdir}/*.so +%{_libdir}/pims_ipc_test +%{_libdir}/pkgconfig/pims-ipc.pc diff --git a/pims-ipc.manifest b/pims-ipc.manifest new file mode 100644 index 0000000..44e6873 --- /dev/null +++ b/pims-ipc.manifest @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/pims-ipc.pc.in b/pims-ipc.pc.in new file mode 100755 index 0000000..e277e4d --- /dev/null +++ b/pims-ipc.pc.in @@ -0,0 +1,13 @@ +# Package Information for pkg-config + +prefix=@PREFIX@ +exec_prefix=@EXEC_PREFIX@ +libdir=@LIBDIR@ +includedir=@INCLUDEDIR@ + +Name: @PROJECT_NAME@ +Description: @PROJECT_NAME@ library +Version: @VERSION@ +Requires: glib-2.0 libzmq +Libs: -L${libdir} -l@PROJECT_NAME@ +Cflags: -I${includedir} diff --git a/src/pims-debug.h b/src/pims-debug.h new file mode 100644 index 0000000..0876864 --- /dev/null +++ b/src/pims-debug.h @@ -0,0 +1,83 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __PIMS_DEBUG_H__ +#define __PIMS_DEBUG_H__ + +#ifndef _NON_SLP +#include +#endif +#include + +#ifdef _cplusplus +extern "C" +{ +#endif + +/* Tag defines */ +#define TAG_IPC "PIMS_IPC" + +/* debug base macro */ +#ifndef _NON_SLP +#define __ug_log(logtype, tag, frmt, args...) \ + do {LOG(logtype, tag, "[%s:%d] "frmt"\n", __FUNCTION__, __LINE__, ##args);} while (0) +#else +#define LOG_VERBOSE "VERBOSE" +#define LOG_DEBUG "DEBUG" +#define LOG_INFO "INFO" +#define LOG_WARN "WARN" +#define LOG_ERROR "ERROR" +#define __ug_log(logtype, tag, frmt, args...) \ + do {printf("[%s][%s][%08x][%s:%d] "frmt"\n", logtype, tag, (unsigned int)pthread_self(), __FUNCTION__, __LINE__, ##args);} while (0) +#endif + +#define pims_verbose(tag, frmt, args...) __ug_log(LOG_VERBOSE, tag, frmt, ##args) +#define pims_debug(tag, frmt, args...) __ug_log(LOG_DEBUG, tag, frmt, ##args) +#define pims_info(tag, frmt, args...) __ug_log(LOG_INFO, tag, frmt, ##args) +#define pims_warn(tag, frmt, args...) __ug_log(LOG_WARN, tag, frmt, ##args) +#define pims_error(tag, frmt, args...) __ug_log(LOG_ERROR, tag, frmt, ##args) + +#ifndef TAG_NAME // SET default TAG +#define TAG_NAME TAG_IPC +#endif + +#define PIMS_VERBOSE_TAG(frmt, args...) pims_verbose(TAG_NAME, frmt, ##args); +#define PIMS_DEBUG_TAG(frmt, args...) pims_debug (TAG_NAME, frmt, ##args); +#define PIMS_INFO_TAG(frmt, args...) pims_info (TAG_NAME, frmt, ##args); +#define PIMS_WARN_TAG(frmt, args...) pims_warn (TAG_NAME, frmt, ##args); +#define PIMS_ERROR_TAG(frmt, args...) pims_error (TAG_NAME, frmt, ##args); + +#define VERBOSE(frmt, args...) PIMS_VERBOSE_TAG(frmt, ##args) +#define DEBUG(frmt, args...) PIMS_DEBUG_TAG(frmt, ##args) +#define INFO(frmt, args...) PIMS_INFO_TAG(frmt, ##args) +#define WARNING(frmt, args...) PIMS_WARN_TAG(frmt, ##args) +#define ERROR(frmt, args...) PIMS_ERROR_TAG(frmt, ##args) + +#define ASSERT(expr) \ + if (!(expr)) \ + { \ + ERROR("Assertion %s", #expr); \ + } \ + assert(expr) + +#ifdef _cplusplus +} +#endif + +#endif /* __PIMS_DEBUG_H__ */ diff --git a/src/pims-internal.h b/src/pims-internal.h new file mode 100644 index 0000000..98de999 --- /dev/null +++ b/src/pims-internal.h @@ -0,0 +1,77 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __PIMS_INTERNAL_H__ +#define __PIMS_INTERNAL_H__ + +#include + +#ifdef _cplusplus +extern "C" +{ +#endif + +#ifndef API +#define API __attribute__ ((visibility("default"))) +#endif + +#define PIMS_IPC_MONITOR_PATH "monitor" +#define PIMS_IPC_DEALER_PATH "dealer" +#define PIMS_IPC_MANAGER_PATH "manager" +#define PIMS_IPC_MONITOR2_PATH "monitor2" +#define PIMS_IPC_MODULE_INTERNAL "pims_ipc_internal" +#define PIMS_IPC_FUNCTION_CREATE "create" +#define PIMS_IPC_FUNCTION_DESTROY "destroy" +#define PIMS_IPC_CALL_ID_CREATE PIMS_IPC_MODULE_INTERNAL ":" PIMS_IPC_FUNCTION_CREATE +#define PIMS_IPC_CALL_ID_DESTROY PIMS_IPC_MODULE_INTERNAL ":" PIMS_IPC_FUNCTION_DESTROY +#define PIMS_IPC_MAKE_CALL_ID(module, function) g_strdup_printf("%s:%s", module, function) + +static inline int _pims_zmq_msg_recv(zmq_msg_t *msg, void *socket, int flags) +{ + int ret = -1; + + while (1) + { + ret = zmq_msg_recv(msg, socket, flags); + if (ret == -1 && errno == EINTR) + continue; + break; + } + return ret; +} + +static inline int _pims_zmq_msg_send(zmq_msg_t *msg, void *socket, int flags) +{ + int ret = -1; + + while (1) + { + ret = zmq_msg_send(msg, socket, flags); + if (ret == -1 && errno == EINTR) + continue; + break; + } + return ret; +} + +#ifdef _cplusplus +} +#endif + +#endif /* __PIMS_INTERNAL_H__ */ diff --git a/src/pims-ipc-data.c b/src/pims-ipc-data.c new file mode 100644 index 0000000..86d39be --- /dev/null +++ b/src/pims-ipc-data.c @@ -0,0 +1,503 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +typedef struct +{ + unsigned int alloc_size; + unsigned int buf_size; + unsigned int free_size; + zmq_msg_t zmsg; + char *pos; + char *buf; + int flags; + unsigned int created:1; + unsigned int zmsg_alloced:1; + unsigned int buf_alloced:1; +} pims_ipc_data_t; + +/* + Structure of data with type(4 bytes order) + +------------------------------------------------------------------+ + | type | size | data | pad | type | size | data | pad | + +------------------------------------------------------------------+ + 4 4 size 0-3 (Size of bytes) + + Structure of data without type(4 bytes order) + +----------------------------------------------------+ + | size | data | pad | size | data | pad | + +----------------------------------------------------+ + 4 size 0-3 (Size of bytes) +*/ + +#define _get_used_size_with_type(data_size) \ + (sizeof(int) + sizeof(int) + data_size + (sizeof(int) - (data_size % sizeof(int)))) + +#define _get_used_size(data_size) \ + (sizeof(int) + data_size + (sizeof(int) - (data_size % sizeof(int)))) + +API pims_ipc_data_h pims_ipc_data_create_with_size(unsigned int size, int flags) +{ + int ret = -1; + pims_ipc_data_t *handle = NULL; + + handle = g_new0(pims_ipc_data_t, 1); + handle->alloc_size = size; + handle->free_size = size; + handle->buf_size = 0; + handle->buf = g_malloc0(size); + handle->pos = handle->buf; + handle->created = 1; + handle->buf_alloced = 1; + + ret = pims_ipc_data_put(handle, &flags, sizeof(int)); + ASSERT(ret == 0); + handle->flags = flags; + + return handle; +} + +API void pims_ipc_data_destroy(pims_ipc_data_h data) +{ + pims_ipc_data_t *handle = (pims_ipc_data_t *)data; + if (handle->buf_alloced) + { + g_free(handle->buf); + } + if (handle->zmsg_alloced) + { + zmq_msg_close(&handle->zmsg); + } + g_free(handle); +} + +API int pims_ipc_data_put(pims_ipc_data_h data, void *buf, unsigned int size) +{ + pims_ipc_data_t *handle = NULL; + unsigned int dsize = size; + unsigned int used_size = 0; + handle = (pims_ipc_data_t *)data; + + if (handle->created == 0) + { + ERROR("This handle isn't create mode."); + return -1; + } + if (handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE) + { + ERROR("Not without-type mode"); + return -1; + } + if (dsize > 0 && buf == NULL) + { + ERROR("Invalid argument"); + return -1; + } + + if (handle->free_size < _get_used_size(dsize)) + { + int new_size = 0; + new_size = handle->alloc_size * 2; + + while (new_size < handle->buf_size + _get_used_size(dsize)) + new_size *= 2; + handle->buf = g_realloc(handle->buf, new_size); + handle->alloc_size = new_size; + handle->free_size = handle->alloc_size - handle->buf_size; + + handle->pos = handle->buf; + handle->pos += handle->buf_size; + + VERBOSE("free_size [%d] dsize [%d]", handle->free_size, dsize); + } + + *(int*)(handle->pos) = dsize; + if (dsize > 0) + memcpy((handle->pos+sizeof(int)), buf, dsize); + + used_size = _get_used_size(dsize); + handle->pos += used_size; + handle->buf_size += used_size; + handle->free_size -= used_size; + + VERBOSE("data put size [%u] data[%p]", dsize, buf); + + return 0; +} + +API void* pims_ipc_data_get(pims_ipc_data_h data, unsigned int *size) +{ + pims_ipc_data_t *handle = NULL; + unsigned int dsize = 0; + unsigned int used_size = 0; + void *buf = NULL; + handle = (pims_ipc_data_t *)data; + + if (handle->created == 1) + { + ERROR("This handle is create mode."); + return NULL; + } + if (handle->buf_size == 0) + { + ERROR("Remain buffer size is 0."); + return NULL; + } + if (handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE) + { + ERROR("Not without-type mode"); + return NULL; + } + + dsize = *(int*)(handle->pos); + buf = (handle->pos+sizeof(int)); + + if (dsize == 0) // current position is next size field becasue data size is 0 + buf = NULL; + + used_size = _get_used_size(dsize); + handle->pos += used_size; + handle->buf_size -= used_size; + handle->free_size += used_size; + + VERBOSE("data get size [%u] data[%p]", dsize, buf); + *size = dsize; + return buf; +} + +API void* pims_ipc_data_get_dup(pims_ipc_data_h data, unsigned int *size) +{ + void *buf = NULL; + + buf = pims_ipc_data_get(data, size); + return g_memdup(buf, *size); +} + + +API int pims_ipc_data_put_with_type(pims_ipc_data_h data, pims_ipc_data_type_e type, void *buf, unsigned int size) +{ + pims_ipc_data_t *handle = NULL; + unsigned int dsize = 0; + unsigned int used_size = 0; + handle = (pims_ipc_data_t *)data; + + if (handle->created == 0) + { + ERROR("This handle isn't create mode."); + return -1; + } + if (!(handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE)) + { + ERROR("Not with-type mode"); + return -1; + } + + switch(type) + { + case PIMS_IPC_DATA_TYPE_CHAR: + dsize = sizeof(char); + break; + case PIMS_IPC_DATA_TYPE_UCHAR: + dsize = sizeof(unsigned char); + break; + case PIMS_IPC_DATA_TYPE_INT: + dsize = sizeof(int); + break; + case PIMS_IPC_DATA_TYPE_UINT: + dsize = sizeof(unsigned int); + break; + case PIMS_IPC_DATA_TYPE_LONG: + dsize = sizeof(long); + break; + case PIMS_IPC_DATA_TYPE_ULONG: + dsize = sizeof(unsigned long); + break; + case PIMS_IPC_DATA_TYPE_FLOAT: + dsize = sizeof(float); + break; + case PIMS_IPC_DATA_TYPE_DOUBLE: + dsize = sizeof(double); + break; + case PIMS_IPC_DATA_TYPE_STRING: + if (buf == NULL) + { + dsize = 0; + } + else + { + dsize = strlen(buf) +1; + } + break; + case PIMS_IPC_DATA_TYPE_MEMORY: + dsize = size; + break; + default: + dsize = 0; + break; + } + + if (dsize != 0 && buf == NULL) + return -1; + + if (buf == NULL && dsize == 0 && type != PIMS_IPC_DATA_TYPE_STRING) + return -1; + + if (handle->free_size < _get_used_size_with_type(dsize)) + { + int new_size = 0; + new_size = handle->alloc_size * 2; + + while (new_size < handle->buf_size + _get_used_size_with_type(dsize)) + new_size *= 2; + handle->buf = g_realloc(handle->buf, new_size); + handle->alloc_size = new_size; + handle->free_size = handle->alloc_size - handle->buf_size; + + handle->pos = handle->buf; + handle->pos += handle->buf_size; + + VERBOSE("free_size [%d] dsize [%d]", handle->free_size, dsize); + } + + *(int*)(handle->pos) = type; + *(int*)(handle->pos+sizeof(int)) = dsize; + if (dsize > 0 && buf != NULL) + memcpy((handle->pos+sizeof(int)*2), buf, dsize); + + used_size = _get_used_size_with_type(dsize); + handle->pos += used_size; + handle->buf_size += used_size; + handle->free_size -= used_size; + + VERBOSE("data put type [%d] size [%u] data[%p]", type, dsize, buf); + + return 0; +} + +API void* pims_ipc_data_get_with_type(pims_ipc_data_h data, pims_ipc_data_type_e *type, unsigned int *size) +{ + pims_ipc_data_t *handle = NULL; + pims_ipc_data_type_e dtype = PIMS_IPC_DATA_TYPE_INVALID; + unsigned int dsize = 0; + unsigned int used_size = 0; + void *buf = NULL; + handle = (pims_ipc_data_t *)data; + + if (handle->created == 1) + { + ERROR("This handle is create mode."); + *type = PIMS_IPC_DATA_TYPE_INVALID; + return NULL; + } + if (handle->buf_size == 0) + { + ERROR("Remain buffer size is 0."); + *type = PIMS_IPC_DATA_TYPE_INVALID; + return NULL; + } + if (!(handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE)) + { + ERROR("Not with-type mode"); + *type = PIMS_IPC_DATA_TYPE_INVALID; + return NULL; + } + + dtype = *(int*)(handle->pos); + dsize = *(int*)(handle->pos+sizeof(int)); + buf = (handle->pos+sizeof(int)*2); + + switch(dtype) + { + case PIMS_IPC_DATA_TYPE_CHAR: + case PIMS_IPC_DATA_TYPE_UCHAR: + case PIMS_IPC_DATA_TYPE_INT: + case PIMS_IPC_DATA_TYPE_UINT: + case PIMS_IPC_DATA_TYPE_LONG: + case PIMS_IPC_DATA_TYPE_ULONG: + case PIMS_IPC_DATA_TYPE_FLOAT: + case PIMS_IPC_DATA_TYPE_DOUBLE: + case PIMS_IPC_DATA_TYPE_STRING: + case PIMS_IPC_DATA_TYPE_MEMORY: + break; + default: + ERROR("Unknown data type"); + dsize = 0; + break; + } + + if (dtype != PIMS_IPC_DATA_TYPE_STRING && dsize == 0) + { + *type = PIMS_IPC_DATA_TYPE_INVALID; + return NULL; + } + + if (dsize == 0) // current position is next type field becasue data size is 0 + buf = NULL; + + used_size = _get_used_size_with_type(dsize); + handle->pos += used_size; + handle->buf_size -= used_size; + handle->free_size += used_size; + + VERBOSE("data get type [%d] size [%u] data[%p]", dtype, dsize, buf); + *type = dtype; + *size = dsize; + return buf; +} + +API void* pims_ipc_data_get_dup_with_type(pims_ipc_data_h data, pims_ipc_data_type_e *type, unsigned int *size) +{ + void *buf = NULL; + + buf = pims_ipc_data_get_with_type(data, type, size); + return g_memdup(buf, *size); +} + +API void* pims_ipc_data_marshal(pims_ipc_data_h data, unsigned int *size) +{ + pims_ipc_data_t *handle = NULL; + + if (!data || !size ) + return NULL; + + handle = (pims_ipc_data_t *)data; + + *size = handle->buf_size; + + return handle->buf; +} + +static void __pims_ipc_data_free_cb(void *data, void *hint) +{ + if (hint) + g_free(hint); +} + +API int pims_ipc_data_marshal_with_zmq(pims_ipc_data_h data, zmq_msg_t *pzmsg) +{ + pims_ipc_data_t *handle = NULL; + + if (!data || !pzmsg ) + return -1; + + handle = (pims_ipc_data_t *)data; + + if (handle->zmsg_alloced) + { + // TODO: need to prevent memory leackage when reusing data marshaled + WARNING("It's ever been marshaled"); + } + + zmq_msg_init_data(&handle->zmsg, handle->buf, handle->buf_size, __pims_ipc_data_free_cb, handle->buf); + + if (zmq_msg_copy(pzmsg, &handle->zmsg) != 0) + { + zmq_msg_close(&handle->zmsg); + return -1; + } + + handle->zmsg_alloced = 1; + handle->buf_alloced = 0; + + return 0; +} + +API void* pims_ipc_data_marshal_dup(pims_ipc_data_h data, unsigned int *size) +{ + unsigned int lsize = 0; + gpointer buf = NULL; + + if (!data || !size ) + return NULL; + + buf = pims_ipc_data_marshal(data, &lsize); + *size = lsize; + return g_memdup(buf, lsize); +} + +API pims_ipc_data_h pims_ipc_data_unmarshal(void *buf, unsigned int size) +{ + pims_ipc_data_t *handle = NULL; + zmq_msg_t zmsg; + + zmq_msg_init_data(&zmsg, buf, size, __pims_ipc_data_free_cb, NULL); + + handle = pims_ipc_data_unmarshal_with_zmq(&zmsg); + zmq_msg_close(&zmsg); + + return handle; +} + +API pims_ipc_data_h pims_ipc_data_unmarshal_with_zmq(zmq_msg_t *pzmsg) +{ + pims_ipc_data_t *handle = NULL; + unsigned int size = 0; + void *ptr = NULL; + + handle = g_new0(pims_ipc_data_t, 1); + zmq_msg_init(&handle->zmsg); + if (zmq_msg_copy(&handle->zmsg, pzmsg) != 0) + { + g_free(handle); + return NULL; + } + handle->alloc_size = zmq_msg_size(&handle->zmsg); + handle->free_size = 0; + handle->buf_size = handle->alloc_size; + handle->buf = zmq_msg_data(&handle->zmsg); + handle->pos = handle->buf; + handle->created = 0; + handle->zmsg_alloced = 1; + + ptr = pims_ipc_data_get(handle, &size); + if (!ptr || size != sizeof(int)) + { + g_free(handle); + return NULL; + } + handle->flags = *((int*)ptr); + + VERBOSE("handle[%p] zmsg[%p] flags[%x]", handle, pzmsg, handle->flags); + + return handle; +} + +API pims_ipc_data_h pims_ipc_data_unmarshal_dup(void *buf, unsigned int size) +{ + pims_ipc_data_t *handle = NULL; + zmq_msg_t zmsg; + + zmq_msg_init_size(&zmsg, size); + memcpy(zmq_msg_data(&zmsg), buf, size); + + handle = pims_ipc_data_unmarshal_with_zmq(&zmsg); + zmq_msg_close(&zmsg); + + return handle; +} diff --git a/src/pims-ipc-svc.c b/src/pims-ipc-svc.c new file mode 100644 index 0000000..86a05c1 --- /dev/null +++ b/src/pims-ipc-svc.c @@ -0,0 +1,1242 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#define PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT 2 + +typedef struct +{ + char *service; + gid_t group; + mode_t mode; + GHashTable *cb_table; + GHashTable *client_table; + GList *workers; + GList *requests; + int workers_max_count; + void* context; + void* router; + void* worker; + void* manager; + void* monitor; +} pims_ipc_svc_t; + +typedef struct +{ + char *service; + gid_t group; + mode_t mode; + void* context; + void* publisher; +} pims_ipc_svc_for_publish_t; + + +typedef struct +{ + pims_ipc_svc_call_cb callback; + void * user_data; +} pims_ipc_svc_cb_t; + +static pims_ipc_svc_t *_g_singleton = NULL; +static pims_ipc_svc_for_publish_t *_g_singleton_for_publish = NULL; + +API int pims_ipc_svc_init(char *service, gid_t group, mode_t mode) +{ + if (_g_singleton) + { + ERROR("Already exist"); + return -1; + } + + _g_singleton = g_new0(pims_ipc_svc_t, 1); + _g_singleton->service = g_strdup(service); + _g_singleton->group = group; + _g_singleton->mode = mode; + _g_singleton->workers_max_count = PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT; + _g_singleton->cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free); + ASSERT(_g_singleton->cb_table); + _g_singleton->client_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); + ASSERT(_g_singleton->client_table); + + return 0; +} + +static void __free_zmq_msg(gpointer data) +{ + zmq_msg_t *lpzmsg = data; + + if (lpzmsg) + { + zmq_msg_close(lpzmsg); + g_free(lpzmsg); + } +} + +API int pims_ipc_svc_deinit(void) +{ + if (!_g_singleton) + return -1; + + g_free(_g_singleton->service); + g_hash_table_destroy(_g_singleton->cb_table); + g_hash_table_destroy(_g_singleton->client_table); + g_list_free(_g_singleton->workers); + g_list_free_full(_g_singleton->requests, __free_zmq_msg); + g_free(_g_singleton); + _g_singleton = NULL; + + return 0; +} + +API int pims_ipc_svc_register(char *module, char *function, pims_ipc_svc_call_cb callback, void *userdata) +{ + pims_ipc_svc_cb_t *cb_data = NULL; + gchar *call_id = NULL; + + if (!module || !function || !callback) + { + ERROR("Invalid argument"); + return -1; + } + cb_data = g_new0(pims_ipc_svc_cb_t, 1); + call_id = PIMS_IPC_MAKE_CALL_ID(module, function); + + VERBOSE("register cb id[%s]", call_id); + cb_data->callback = callback; + cb_data->user_data = userdata; + g_hash_table_insert(_g_singleton->cb_table, call_id, cb_data); + + return 0; +} + +API int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode) +{ + if (_g_singleton_for_publish) + { + ERROR("Already exist"); + return -1; + } + + _g_singleton_for_publish = g_new0(pims_ipc_svc_for_publish_t, 1); + _g_singleton_for_publish->service = g_strdup(service); + _g_singleton_for_publish->group = group; + _g_singleton_for_publish->mode = mode; + + return 0; +} + +API int pims_ipc_svc_deinit_for_publish(void) +{ + if (!_g_singleton_for_publish) + return -1; + + g_free(_g_singleton_for_publish->service); + g_free(_g_singleton_for_publish); + _g_singleton_for_publish = NULL; + + return 0; +} + +static void __pims_ipc_svc_data_free_cb(void *data, void *hint) +{ + if (hint) + g_free(hint); +} + +API int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data) +{ + pims_ipc_svc_for_publish_t *ipc_svc = _g_singleton_for_publish; + gboolean is_valid = FALSE; + gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, event); + + // init messages + zmq_msg_t call_id_msg; + zmq_msg_t data_msg; + + zmq_msg_init_data(&call_id_msg, call_id, strlen(call_id) + 1, __pims_ipc_svc_data_free_cb, call_id); + VERBOSE("call id = %s", (char*)zmq_msg_data(&call_id_msg)); + + zmq_msg_init(&data_msg); + + do { + if (data == NULL) + { + // send call id + if (_pims_zmq_msg_send(&call_id_msg, ipc_svc->publisher, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + } + else + { + // send call id + if (_pims_zmq_msg_send(&call_id_msg, ipc_svc->publisher, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + // marshal data + if (pims_ipc_data_marshal_with_zmq(data, &data_msg) != 0) + { + ERROR("marshal error"); + break; + } + + VERBOSE("the size of sending data = %d", zmq_msg_size(&data_msg)); + + // send data + if (_pims_zmq_msg_send(&data_msg, ipc_svc->publisher, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + } + + is_valid = TRUE; + } while (0); + + zmq_msg_close(&call_id_msg); + zmq_msg_close(&data_msg); + + if (is_valid == FALSE) + return -1; + return 0; +} + +static void __run_callback(int worker_id, char *call_id, pims_ipc_data_h dhandle_in, pims_ipc_data_h *dhandle_out) +{ + pims_ipc_svc_cb_t *cb_data = NULL; + + VERBOSE("Call id [%s]", call_id); + + cb_data = (pims_ipc_svc_cb_t*)g_hash_table_lookup(_g_singleton->cb_table, call_id); + if (cb_data == NULL) + { + VERBOSE("unable to find %s", call_id); + return; + } + + cb_data->callback((pims_ipc_h)worker_id, dhandle_in, dhandle_out, cb_data->user_data); +} + +static int __process_worker_task(int worker_id, void *context, void *worker) +{ + gboolean is_create = FALSE; + gboolean is_destroy = FALSE; + char *pid = NULL; + char *call_id = NULL; + int64_t more = 0; + size_t more_size = sizeof(more); + pims_ipc_data_h dhandle_in = NULL; + pims_ipc_data_h dhandle_out = NULL; + + VERBOSE(""); + +#ifdef _TEST + struct timeval tv; + gettimeofday(&tv, NULL); + printf("worker time[%lu:%lu]\n", tv.tv_sec, tv.tv_usec); +#endif + + zmq_msg_t pid_msg; + zmq_msg_t sequence_no_msg; + zmq_msg_t call_id_msg; + zmq_msg_t data_msg; + + zmq_msg_init(&pid_msg); + zmq_msg_init(&sequence_no_msg); + zmq_msg_init(&call_id_msg); + zmq_msg_init(&data_msg); + + do { + // read pid + if (_pims_zmq_msg_recv(&pid_msg, worker, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + // read sequence no + if (_pims_zmq_msg_recv(&sequence_no_msg, worker, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + // read call id + if (_pims_zmq_msg_recv(&call_id_msg, worker, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + more = 0; + zmq_getsockopt(worker, ZMQ_RCVMORE, &more, &more_size); + if (more) + { + // read data + if (_pims_zmq_msg_recv(&data_msg, worker, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + dhandle_in = pims_ipc_data_unmarshal_with_zmq(&data_msg); + if (dhandle_in == NULL) + { + ERROR("unmarshal error"); + break; + } + } + + pid = (char*)zmq_msg_data(&pid_msg); + ASSERT(pid); + VERBOSE("client pid = %s", pid); + + call_id = (char*)zmq_msg_data(&call_id_msg); + ASSERT(call_id); + VERBOSE("call_id = [%s]", call_id); + + // call a callback function with call id and data + if (strcmp(PIMS_IPC_CALL_ID_CREATE, call_id) == 0) + { + is_create = TRUE; + } + else if (strcmp(PIMS_IPC_CALL_ID_DESTROY, call_id) == 0) + { + is_destroy = TRUE; + } + else + { + __run_callback(worker_id, call_id, dhandle_in, &dhandle_out); + } + + // send pid + if (_pims_zmq_msg_send(&pid_msg, worker, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + // send empty + zmq_msg_t empty_msg; + zmq_msg_init_size(&empty_msg, 0); + if (_pims_zmq_msg_send(&empty_msg, worker, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + zmq_msg_close(&empty_msg); + break; + } + zmq_msg_close(&empty_msg); + + // send sequence no + if (_pims_zmq_msg_send(&sequence_no_msg, worker, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + if (dhandle_out) + { + // send call id + if (_pims_zmq_msg_send(&call_id_msg, worker, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + // marshal data + zmq_msg_close(&data_msg); + zmq_msg_init(&data_msg); + if (pims_ipc_data_marshal_with_zmq(dhandle_out, &data_msg) != 0) + { + ERROR("marshal error"); + break; + } + + // send data + VERBOSE("the size of sending data = %d", zmq_msg_size(&data_msg)); + if (_pims_zmq_msg_send(&data_msg, worker, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + } + else + { + // send call id + if (_pims_zmq_msg_send(&call_id_msg, worker, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + } + } while (0); + + zmq_msg_close(&pid_msg); + zmq_msg_close(&sequence_no_msg); + zmq_msg_close(&call_id_msg); + zmq_msg_close(&data_msg); + + if (dhandle_in) + pims_ipc_data_destroy(dhandle_in); + if (dhandle_out) + pims_ipc_data_destroy(dhandle_out); + + VERBOSE("responsed"); + +#ifdef _TEST + gettimeofday(&tv, NULL); + printf("worker time[%lu:%lu]\n", tv.tv_sec, tv.tv_usec); +#endif + + if (is_destroy) + return -1; + return 0; +} + +static int __process_manager_task(int worker_id, void *context, void *manager) +{ + VERBOSE(""); + + // read pid + zmq_msg_t pid_msg; + zmq_msg_init(&pid_msg); + if (_pims_zmq_msg_recv(&pid_msg, manager, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + zmq_msg_close(&pid_msg); + return -1; + } + zmq_msg_close(&pid_msg); + + return -1; +} + +static void* __worker_loop(void *args) +{ + void *context = args; + int worker_id = (int)pthread_self(); + char *path = NULL; + + void *worker = zmq_socket(context, ZMQ_DEALER); + if (!worker) + { + ERROR("socket error : %s", zmq_strerror(errno)); + return NULL; + } + if (zmq_setsockopt(worker, ZMQ_IDENTITY, &worker_id, sizeof(int)) != 0) + { + ERROR("setsockopt error : %s", zmq_strerror(errno)); + zmq_close(worker); + return NULL; + } + path = g_strdup_printf("inproc://%s-%s", _g_singleton->service, PIMS_IPC_DEALER_PATH); + if (zmq_connect(worker, path) != 0) + { + ERROR("connect error : %s", zmq_strerror(errno)); + g_free(path); + zmq_close(worker); + return NULL; + } + g_free(path); + + // send the ID of a worker to the manager + void *manager = zmq_socket(context, ZMQ_DEALER); + if (!manager) + { + ERROR("socket error : %s", zmq_strerror(errno)); + zmq_close(worker); + return NULL; + } + if (zmq_setsockopt(manager, ZMQ_IDENTITY, &worker_id, sizeof(int)) != 0) + { + ERROR("setsockopt error : %s", zmq_strerror(errno)); + zmq_close(manager); + zmq_close(worker); + return NULL; + } + path = g_strdup_printf("inproc://%s-%s", _g_singleton->service, PIMS_IPC_MANAGER_PATH); + if (zmq_connect(manager, path) != 0) + { + ERROR("connect error : %s", zmq_strerror(errno)); + g_free(path); + zmq_close(manager); + zmq_close(worker); + return NULL; + } + g_free(path); + + VERBOSE("starting worker id: %x", worker_id); + zmq_msg_t message; + zmq_msg_init_size(&message, sizeof(int)); + memcpy(zmq_msg_data(&message), &worker_id, sizeof(int)); + if (_pims_zmq_msg_send(&message, manager, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + zmq_msg_close(&message); + zmq_close(manager); + zmq_close(worker); + return NULL; + } + zmq_msg_close(&message); + + // poll all sockets + while (1) + { + zmq_pollitem_t items[] = { + {worker, 0, ZMQ_POLLIN, 0}, + {manager, 0, ZMQ_POLLIN, 0} + }; + + if (zmq_poll(items, 2, -1) == -1) + { + ERROR("poll error : %s", zmq_strerror(errno)); + break; + } + + if (items[0].revents & ZMQ_POLLIN) + { + if (__process_worker_task(worker_id, context, worker) != 0) + break; + } + + if (items[1].revents & ZMQ_POLLIN) + { + if (__process_manager_task(worker_id, context, manager) != 0) + break; + } + } + + VERBOSE("terminating worker id: %x", worker_id); + + zmq_close(manager); + zmq_close(worker); + return NULL; +} + +static void __launch_worker(void *(*start_routine) (void *), void *context) +{ + pthread_t worker; + pthread_attr_t attr; + + // set kernel thread + pthread_attr_init(&attr); + pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); + + pthread_create(&worker, &attr, start_routine, context); + // detach this thread + pthread_detach(worker); +} + +static gboolean __is_worker_available() +{ + if (_g_singleton->workers) + return TRUE; + else + return FALSE; +} + +static int __get_worker(const char *pid, int *worker_id) +{ + ASSERT(pid); + ASSERT(worker_id); + + if (!__is_worker_available()) + { + ERROR("There is no idle worker"); + return -1; + } + *worker_id = (int)(g_list_first(_g_singleton->workers)->data); + _g_singleton->workers = g_list_delete_link(_g_singleton->workers, + g_list_first(_g_singleton->workers)); + + g_hash_table_insert(_g_singleton->client_table, g_strdup(pid), GINT_TO_POINTER(*worker_id)); + + return 0; +} + +static int __find_worker(const char *pid, int *worker_id) +{ + char *orig_pid = NULL; + + ASSERT(pid); + ASSERT(worker_id); + + if (g_hash_table_lookup_extended(_g_singleton->client_table, pid, + (gpointer*)&orig_pid, (gpointer*)worker_id) == TRUE) + { + VERBOSE("found worker id for %s = %x", pid, *worker_id); + return 0; + } + else + { + VERBOSE("unable to find worker id for %s", pid); + return -1; + } +} + +static void __remove_worker(const char *pid) +{ + g_hash_table_remove(_g_singleton->client_table, pid); +} + +static void __terminate_worker(void *manager, int worker_id, const char *pid) +{ + // send worker id + zmq_msg_t worker_id_msg; + zmq_msg_init_size(&worker_id_msg, sizeof(int)); + memcpy(zmq_msg_data(&worker_id_msg), &worker_id, sizeof(int)); + if (_pims_zmq_msg_send(&worker_id_msg, manager, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + zmq_msg_close(&worker_id_msg); + return; + } + zmq_msg_close(&worker_id_msg); + + // send pid + zmq_msg_t pid_msg; + zmq_msg_init_data(&pid_msg, (char*)pid, strlen(pid) + 1, NULL, NULL); + if (_pims_zmq_msg_send(&pid_msg, manager, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + zmq_msg_close(&pid_msg); + return; + } + zmq_msg_close(&pid_msg); +} + +static gboolean __enqueue_zmq_msg(zmq_msg_t *zmsg) +{ + zmq_msg_t *lpzmsg = NULL; + + if (zmsg) + { + lpzmsg = g_new0(zmq_msg_t, 1); + zmq_msg_init(lpzmsg); + zmq_msg_copy(lpzmsg, zmsg); + } + _g_singleton->requests = g_list_append(_g_singleton->requests, lpzmsg); + + return TRUE; +} + +static gboolean __dequeue_zmq_msg(zmq_msg_t *zmsg) +{ + zmq_msg_t *lpzmsg = NULL; + + ASSERT(_g_singleton->requests); + lpzmsg = (zmq_msg_t*)(g_list_first(_g_singleton->requests)->data); + _g_singleton->requests = g_list_delete_link(_g_singleton->requests, + g_list_first(_g_singleton->requests)); + + if (lpzmsg == NULL) + return FALSE; + + zmq_msg_copy(zmsg, lpzmsg); + zmq_msg_close(lpzmsg); + g_free(lpzmsg); + + return TRUE; +} + +static int __process_router_event(void *context, void *router, void *worker, gboolean for_queue) +{ + char *pid = NULL; + char *call_id = NULL; + int64_t more = 0; + size_t more_size = sizeof(more); + int worker_id = -1; + gboolean is_with_data = FALSE; + gboolean is_valid = FALSE; + +#ifdef _TEST + struct timeval tv; + gettimeofday(&tv, NULL); + printf("router time[%lu:%lu]\n", tv.tv_sec, tv.tv_usec); +#endif + + // init messages for receiving + zmq_msg_t pid_msg; + zmq_msg_t sequence_no_msg; + zmq_msg_t call_id_msg; + zmq_msg_t data_msg; + + zmq_msg_init(&pid_msg); + zmq_msg_init(&sequence_no_msg); + zmq_msg_init(&call_id_msg); + zmq_msg_init(&data_msg); + + // relay a request from a client to a worker + do { + if (for_queue) + { + // dequeue a request + __dequeue_zmq_msg(&pid_msg); + __dequeue_zmq_msg(&sequence_no_msg); + __dequeue_zmq_msg(&call_id_msg); + is_with_data = __dequeue_zmq_msg(&data_msg); + + if (is_with_data) + __dequeue_zmq_msg(NULL); + } + else + { + // read pid + if (_pims_zmq_msg_recv(&pid_msg, router, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + // read empty and kill + zmq_msg_t empty_msg; + zmq_msg_init(&empty_msg); + if (_pims_zmq_msg_recv(&empty_msg, router, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + zmq_msg_close(&empty_msg); + break; + } + zmq_msg_close(&empty_msg); + + // read sequence no + more = 0; + zmq_getsockopt(router, ZMQ_RCVMORE, &more, &more_size); + if (!more) + { + ERROR("recv error : corrupted message"); + break; + } + if (_pims_zmq_msg_recv(&sequence_no_msg, router, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + // read call id + more = 0; + zmq_getsockopt(router, ZMQ_RCVMORE, &more, &more_size); + if (!more) + { + ERROR("recv error : corrupted message"); + break; + } + if (_pims_zmq_msg_recv(&call_id_msg, router, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + // read data + more = 0; + zmq_getsockopt(router, ZMQ_RCVMORE, &more, &more_size); + if (more) + { + is_with_data = TRUE; + if (_pims_zmq_msg_recv(&data_msg, router, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + } + } + + pid = zmq_msg_data(&pid_msg); + ASSERT(pid != NULL); + VERBOSE("client pid = %s", pid); + + call_id = (char*)zmq_msg_data(&call_id_msg); + ASSERT(call_id != NULL); + VERBOSE("call_id = [%s], create_call_id = [%s]", PIMS_IPC_CALL_ID_CREATE, call_id); + if (strcmp(PIMS_IPC_CALL_ID_CREATE, call_id) == 0) + { + // Get a worker. If cannot get a worker, create a worker and enqueue a current request + __launch_worker(__worker_loop, context); + if (__get_worker((const char*)pid, &worker_id) != 0) + { + // enqueue a request until a new worker will be registered + __enqueue_zmq_msg(&pid_msg); + __enqueue_zmq_msg(&sequence_no_msg); + __enqueue_zmq_msg(&call_id_msg); + if (is_with_data) + __enqueue_zmq_msg(&data_msg); + __enqueue_zmq_msg(NULL); + + is_valid = TRUE; + break; + } + } + else + { + // Find a worker + if (__find_worker((const char*)pid, &worker_id) != 0) + { + ERROR("unable to find a worker"); + break; + } + + if (strcmp(PIMS_IPC_CALL_ID_DESTROY, call_id) == 0) + { + __remove_worker((const char*)pid); + } + } + + VERBOSE("routing worker id = %x", worker_id); + // send worker id + zmq_msg_t worker_id_msg; + zmq_msg_init_size(&worker_id_msg, sizeof(int)); + memcpy(zmq_msg_data(&worker_id_msg), &worker_id, sizeof(int)); + if (_pims_zmq_msg_send(&worker_id_msg, worker, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + zmq_msg_close(&worker_id_msg); + break; + } + zmq_msg_close(&worker_id_msg); + + // send pid + if (_pims_zmq_msg_send(&pid_msg, worker, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + // send sequence no + if (_pims_zmq_msg_send(&sequence_no_msg, worker, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + // send call id + if (_pims_zmq_msg_send(&call_id_msg, worker, is_with_data?ZMQ_SNDMORE:0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + // send data + if (is_with_data) + { + if (_pims_zmq_msg_send(&data_msg, worker, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + } + + is_valid = TRUE; + } while (0); + + zmq_msg_close(&pid_msg); + zmq_msg_close(&sequence_no_msg); + zmq_msg_close(&call_id_msg); + zmq_msg_close(&data_msg); + +#ifdef _TEST + gettimeofday(&tv, NULL); + printf("router time[%lu:%lu]\n", tv.tv_sec, tv.tv_usec); +#endif + + if (is_valid == FALSE) + return -1; + + return 0; +} + +static int __process_worker_event(void *context, void *worker, void *router) +{ + zmq_msg_t message; + int64_t more = 0; + size_t more_size = sizeof(more); + + // Remove worker_id + zmq_msg_init(&message); + if (_pims_zmq_msg_recv(&message, worker, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + } + zmq_msg_close(&message); + + while (1) + { + // Process all parts of the message + zmq_msg_init(&message); + if (_pims_zmq_msg_recv(&message, worker, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + } + more = 0; + zmq_getsockopt(worker, ZMQ_RCVMORE, &more, &more_size); + VERBOSE("router received a message : more[%u]", (unsigned int)more); + if (_pims_zmq_msg_send(&message, router, more?ZMQ_SNDMORE:0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + } + zmq_msg_close(&message); + if (!more) + break; // Last message part + } + + return 0; +} + +static int __process_manager_event(void *context, void *manager) +{ + zmq_msg_t worker_id_msg; + int worker_id = -1; + + zmq_msg_init(&worker_id_msg); + if (_pims_zmq_msg_recv(&worker_id_msg, manager, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + zmq_msg_close(&worker_id_msg); + return -1; + } + zmq_msg_close(&worker_id_msg); + + zmq_msg_init(&worker_id_msg); + if (_pims_zmq_msg_recv(&worker_id_msg, manager, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + zmq_msg_close(&worker_id_msg); + return -1; + } + memcpy(&worker_id, zmq_msg_data(&worker_id_msg), sizeof(int)); + zmq_msg_close(&worker_id_msg); + + VERBOSE("registered worker id = %x", worker_id); + _g_singleton->workers = g_list_append(_g_singleton->workers, GINT_TO_POINTER(worker_id)); + + return 0; +} + +static int __process_monitor_event(void *context, void *monitor, void *manager) +{ + int worker_id = -1; + char *pid = NULL; + zmq_msg_t pid_msg; + + VERBOSE(""); + + // read pid + zmq_msg_init(&pid_msg); + if (_pims_zmq_msg_recv(&pid_msg, monitor, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + zmq_msg_close(&pid_msg); + return -1; + } + + pid = (char*)zmq_msg_data(&pid_msg); + ASSERT(pid); + VERBOSE("client pid = %s", pid); + + if (__find_worker(pid, &worker_id) != 0) + return 0; + + VERBOSE("found worker id for %s = %x", pid, worker_id); + + __terminate_worker(manager, worker_id, pid); + __remove_worker(pid); + + zmq_msg_close(&pid_msg); + + return 0; +} + +static void __client_closed_cb(const char *pid, void *data) +{ + pims_ipc_svc_t *ipc_svc = (pims_ipc_svc_t*)data; + + VERBOSE("client pid = %s", pid); + + zmq_msg_t pid_msg; + zmq_msg_init_size(&pid_msg, strlen(pid) + 1); + memcpy(zmq_msg_data(&pid_msg), pid, strlen(pid) + 1); + if (_pims_zmq_msg_send(&pid_msg, ipc_svc->monitor, 0) == -1) + ERROR("send error : %s", zmq_strerror(errno)); + zmq_msg_close(&pid_msg); +} + +static int __open_zmq_socket(void *context, pims_ipc_svc_t *ipc_svc) +{ + char *path = NULL; + int ret = -1; + int i = 0; + + void *router = zmq_socket(context, ZMQ_ROUTER); + if (!router) + { + ERROR("socket error : %s", zmq_strerror(errno)); + return -1; + } + path = g_strdup_printf("ipc://%s", ipc_svc->service); + if (zmq_bind(router, path) != 0) + { + ERROR("bind error : %s", zmq_strerror(errno)); + zmq_close(router); + return -1; + } + g_free(path); + + ret = chown(ipc_svc->service, getuid(), ipc_svc->group); + ret = chmod(ipc_svc->service, ipc_svc->mode); + + void *worker = zmq_socket(context, ZMQ_ROUTER); + if (!worker) + { + ERROR("socket error : %s", zmq_strerror(errno)); + zmq_close(router); + return -1; + } + path = g_strdup_printf("inproc://%s-%s", ipc_svc->service, PIMS_IPC_DEALER_PATH); + if (zmq_bind(worker, path) != 0) + { + ERROR("bind error : %s", zmq_strerror(errno)); + zmq_close(router); + zmq_close(worker); + return -1; + } + g_free(path); + + void *manager = zmq_socket(context, ZMQ_ROUTER); + if (!manager) + { + ERROR("socket error : %s", zmq_strerror(errno)); + zmq_close(router); + zmq_close(worker); + return -1; + } + path = g_strdup_printf("inproc://%s-%s", ipc_svc->service, PIMS_IPC_MANAGER_PATH); + if (zmq_bind(manager, path) != 0) + { + ERROR("bind error : %s", zmq_strerror(errno)); + zmq_close(router); + zmq_close(worker); + zmq_close(manager); + return -1; + } + g_free(path); + + void *monitor = zmq_socket(context, ZMQ_PAIR); + if (!monitor) + { + ERROR("socket error : %s", zmq_strerror(errno)); + zmq_close(router); + zmq_close(worker); + zmq_close(manager); + return -1; + } + path = g_strdup_printf("inproc://%s-%s", ipc_svc->service, PIMS_IPC_MONITOR2_PATH); + if (zmq_bind(monitor, path) != 0) + { + ERROR("bind error : %s", zmq_strerror(errno)); + zmq_close(router); + zmq_close(worker); + zmq_close(manager); + zmq_close(monitor); + return -1; + } + g_free(path); + + ipc_svc->context = context; + ipc_svc->router = router; + ipc_svc->worker = worker; + ipc_svc->manager = manager; + ipc_svc->monitor = monitor; + + path = g_strdup_printf("%s-%s", ipc_svc->service, PIMS_IPC_MONITOR_PATH); + ret = _server_socket_init(path, ipc_svc->group, ipc_svc->mode, __client_closed_cb, ipc_svc); + ASSERT(ret != -1); + g_free(path); + + // launch worker threads in advance + for (i = 0; i < ipc_svc->workers_max_count; i++) + __launch_worker(__worker_loop, context); + + return 0; +} + +static void __close_zmq_socket(pims_ipc_svc_t *ipc_svc) +{ + zmq_close(ipc_svc->router); + zmq_close(ipc_svc->worker); + zmq_close(ipc_svc->manager); + zmq_close(ipc_svc->monitor); +} + +static int __open_zmq_socket_for_publish(void *context, pims_ipc_svc_for_publish_t *ipc_svc) +{ + char *path = NULL; + int ret = -1; + + ipc_svc->context = context; + void *publisher = NULL; + publisher = zmq_socket(context, ZMQ_PUB); + if (!publisher) + { + ERROR("socket error : %s", zmq_strerror(errno)); + return -1; + } + + path = g_strdup_printf("ipc://%s", ipc_svc->service); + if (zmq_bind(publisher, path) != 0) + { + ERROR("bind error : %s", zmq_strerror(errno)); + zmq_close(publisher); + return -1; + } + g_free(path); + + ret = chown(ipc_svc->service, getuid(), ipc_svc->group); + ret = chmod(ipc_svc->service, ipc_svc->mode); + + ipc_svc->context = context; + ipc_svc->publisher = publisher; + + return 0; +} + +static void __close_zmq_socket_for_publish(pims_ipc_svc_for_publish_t *ipc_svc) +{ + zmq_close(ipc_svc->publisher); +} + +static void* __main_loop(void *args) +{ + char *path = NULL; + int ret = -1; + pims_ipc_svc_t *ipc_svc = (pims_ipc_svc_t*)args; + + void *monitor_peer = zmq_socket(ipc_svc->context, ZMQ_PAIR); + ASSERT(monitor_peer); + + path = g_strdup_printf("inproc://%s-%s", ipc_svc->service, PIMS_IPC_MONITOR2_PATH); + ret = zmq_connect(monitor_peer, path); + ASSERT(ret == 0); + g_free(path); + + // poll all sockets + while (1) + { + zmq_pollitem_t items[] = { + {ipc_svc->router, 0, ZMQ_POLLIN, 0}, + {ipc_svc->worker, 0, ZMQ_POLLIN, 0}, + {ipc_svc->manager, 0, ZMQ_POLLIN, 0}, + {monitor_peer, 0, ZMQ_POLLIN, 0} + }; + + if (zmq_poll(items, 4, -1) == -1) + { + if (errno == EINTR) + continue; + + ERROR("poll error : %s", zmq_strerror(errno)); + break; + } + + if (items[0].revents & ZMQ_POLLIN) + { + __process_router_event(ipc_svc->context, ipc_svc->router, ipc_svc->worker, FALSE); + } + + if (items[1].revents & ZMQ_POLLIN) + { + __process_worker_event(ipc_svc->context, ipc_svc->worker, ipc_svc->router); + } + + if (items[2].revents & ZMQ_POLLIN) + { + __process_manager_event(ipc_svc->context, ipc_svc->manager); + if (ipc_svc->requests) + __process_router_event(ipc_svc->context, ipc_svc->router, ipc_svc->worker, TRUE); + } + + if (items[3].revents & ZMQ_POLLIN) + { + __process_monitor_event(ipc_svc->context, monitor_peer, ipc_svc->manager); + } + } + + zmq_close(monitor_peer); + + return NULL; +} + +API void pims_ipc_svc_run_main_loop(GMainLoop* loop) +{ + int retval = -1; + GMainLoop* main_loop = loop; + + if(main_loop == NULL) { + main_loop = g_main_loop_new(NULL, FALSE); + } + + void *context = zmq_init(1); + ASSERT (context != NULL); + + if (_g_singleton_for_publish) + { + retval = __open_zmq_socket_for_publish(context, _g_singleton_for_publish); + ASSERT(retval == 0); + } + + if (_g_singleton) + { + retval = __open_zmq_socket(context, _g_singleton); + ASSERT(retval == 0); + } + + __launch_worker(__main_loop, _g_singleton); + + g_main_loop_run(main_loop); + + if (_g_singleton) + { + __close_zmq_socket(_g_singleton); + } + + if (_g_singleton_for_publish) + { + __close_zmq_socket_for_publish(_g_singleton_for_publish); + } + + if (zmq_term(context) == -1) + WARNING("term error : %s", zmq_strerror(errno)); +} diff --git a/src/pims-ipc.c b/src/pims-ipc.c new file mode 100644 index 0000000..5f9be24 --- /dev/null +++ b/src/pims-ipc.c @@ -0,0 +1,813 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\ + sequence_no = ++((handle)->call_sequence_no);\ +} while (0) + +static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER; + +typedef enum +{ + PIMS_IPC_CALL_STATUS_READY = 0, + PIMS_IPC_CALL_STATUS_IN_PROGRESS +} pims_ipc_call_status_e; + +typedef enum +{ + PIMS_IPC_MODE_REQ = 0, + PIMS_IPC_MODE_SUB +} pims_ipc_mode_e; + +typedef struct +{ + pid_t pid; + void *context; + unsigned int ref_cnt; + unsigned int id_sequence_no; + GList *subscribe_handles; +} pims_ipc_context_t; + +static pims_ipc_context_t *_g_singleton = NULL; + +typedef struct +{ + pims_ipc_subscribe_cb callback; + void * user_data; +} pims_ipc_cb_t; + +typedef struct +{ + int fd; + void *requester; + char *service; + char *id; + GIOChannel *async_channel; + guint async_source_id; + pims_ipc_call_status_e call_status; + unsigned int call_sequence_no; + pims_ipc_call_async_cb call_async_callback; + void *call_async_userdata; + GHashTable *subscribe_cb_table; +} pims_ipc_t; + + +static void __pims_ipc_free_handle(pims_ipc_t *handle) +{ + pthread_mutex_lock(&__gmutex); + + g_free(handle->id); + g_free(handle->service); + + if (handle->requester) + zmq_close(handle->requester); + + if (handle->fd != -1) + close(handle->fd); + + if (handle->async_channel) + { + // remove a subscriber handle from the golbal list + if (_g_singleton) + { + _g_singleton->subscribe_handles = g_list_remove(_g_singleton->subscribe_handles, handle); + VERBOSE("the count of subscribe handles = %d", g_list_length(_g_singleton->subscribe_handles)); + } + + g_source_remove(handle->async_source_id); + g_io_channel_unref(handle->async_channel); + } + + if (handle->subscribe_cb_table) + g_hash_table_destroy(handle->subscribe_cb_table); + + g_free(handle); + + if (_g_singleton && --_g_singleton->ref_cnt <= 0) + { + if (zmq_term(_g_singleton->context) == -1) + { + WARNING("term error : %s", zmq_strerror(errno)); + } + g_free(_g_singleton); + _g_singleton = NULL; + } + + pthread_mutex_unlock(&__gmutex); +} + +static int __pims_ipc_receive_for_subscribe(pims_ipc_t *handle) +{ + gboolean is_valid = FALSE; + int64_t more = 0; + pims_ipc_data_h dhandle = NULL; + pims_ipc_cb_t *cb_data = NULL; + + zmq_msg_t call_id_msg; + zmq_msg_t data_msg; + + zmq_msg_init(&call_id_msg); + zmq_msg_init(&data_msg); + + do { + // recv call id + if (_pims_zmq_msg_recv(&call_id_msg, handle->requester, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + // find a callback by a call id + cb_data = (pims_ipc_cb_t*)g_hash_table_lookup(handle->subscribe_cb_table, zmq_msg_data(&call_id_msg)); + + size_t more_size = sizeof(more); + zmq_getsockopt(handle->requester, ZMQ_RCVMORE, &more, &more_size); + if (more) + { + if (_pims_zmq_msg_recv(&data_msg, handle->requester, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + if (cb_data == NULL) + { + VERBOSE("unable to find %s", (char*)zmq_msg_data(&call_id_msg)); + is_valid = TRUE; + break; + } + + dhandle = pims_ipc_data_unmarshal_with_zmq(&data_msg); + if (dhandle == NULL) + { + ERROR("unmarshal error"); + break; + } + + cb_data->callback((pims_ipc_h)handle, dhandle, cb_data->user_data); + } + + is_valid = TRUE; + } while (0); + + zmq_msg_close(&call_id_msg); + zmq_msg_close(&data_msg); + if (dhandle) + pims_ipc_data_destroy(dhandle); + + if (is_valid == FALSE) + return -1; + return 0; +} + +static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data) +{ + pims_ipc_t *handle = (pims_ipc_t *)data; + uint32_t zmq_events = 0; + size_t opt_len = 0; + int rc = 0; + + VERBOSE(""); + + pthread_mutex_lock(&__gmutex); + + // check if a subscriber handle is exists + if (_g_singleton == NULL || g_list_find(_g_singleton->subscribe_handles, handle) == NULL) + { + DEBUG("No such handle that ID is %p", handle); + pthread_mutex_unlock(&__gmutex); + return FALSE; + } + + opt_len = sizeof(uint32_t); + while (1) + { + rc = zmq_getsockopt(handle->requester, ZMQ_EVENTS, &zmq_events, &opt_len); + ASSERT(rc == 0); + if (ZMQ_POLLIN & zmq_events) { + __pims_ipc_receive_for_subscribe(handle); + } + else + { + break; + } + } + + pthread_mutex_unlock(&__gmutex); + + return TRUE; +} + +static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode) +{ + pims_ipc_context_t *ghandle = NULL; + pims_ipc_t *handle = NULL; + pid_t pid = 0; + void *context = NULL; + void *requester = NULL; + char *path = NULL; + gboolean is_ok = FALSE; + + pthread_mutex_lock(&__gmutex); + + do { + if (_g_singleton == NULL) + { + ghandle = g_new0(pims_ipc_context_t, 1); + if (ghandle == NULL) + { + ERROR("Failed to allocation"); + break; + } + + pid = getpid(); + ghandle->pid = pid; + VERBOSE("The PID of the current process is %d.", pid); + + context = zmq_init(1); + if (!context) + { + ERROR("init error : %s", zmq_strerror(errno)); + break; + } + ghandle->context = context; + ghandle->id_sequence_no = (unsigned int)time(NULL); + ghandle->ref_cnt = 1; + _g_singleton = ghandle; + } + else + { + ghandle = _g_singleton; + ghandle->ref_cnt++; + pid = ghandle->pid; + context = ghandle->context; + } + + VERBOSE("Create %d th..", ghandle->ref_cnt); + + handle = g_new0(pims_ipc_t, 1); + if (handle == NULL) + { + ERROR("Failed to allocation"); + break; + } + handle->fd = -1; + + handle->service = g_strdup(service); + handle->id = g_strdup_printf("%x:%x", pid, ghandle->id_sequence_no++); + + if (mode == PIMS_IPC_MODE_REQ) + { + path = g_strdup_printf("%s-%s", handle->service, PIMS_IPC_MONITOR_PATH); + handle->fd = _client_socket_init(path, handle->id); + if (handle->fd == -1) + { + g_free(path); + break; + } + g_free(path); + + requester = zmq_socket(context, ZMQ_REQ); + if (!requester) + { + ERROR("socket error : %s", zmq_strerror(errno)); + break; + } + if (zmq_setsockopt(requester, ZMQ_IDENTITY, handle->id, strlen(handle->id) + 1) != 0) + { + ERROR("setsockopt error : %s", zmq_strerror(errno)); + break; + } + handle->requester = requester; + + path = g_strdup_printf("ipc://%s", handle->service); + if (zmq_connect(requester, path) != 0) + { + ERROR("connect error : %s", zmq_strerror(errno)); + g_free(path); + break; + } + g_free(path); + + handle->call_sequence_no = (unsigned int)time(NULL); + if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) + { + WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed"); + } + } + else + { + requester = zmq_socket(context, ZMQ_SUB); + if (!requester) + { + ERROR("socket error : %s", zmq_strerror(errno)); + break; + } + if (zmq_setsockopt(requester, ZMQ_SUBSCRIBE, "", 0) != 0) + { + ERROR("setsockopt error : %s", zmq_strerror(errno)); + break; + } + handle->requester = requester; + + path = g_strdup_printf("ipc://%s", handle->service); + if (zmq_connect(requester, path) != 0) + { + ERROR("connect error : %s", zmq_strerror(errno)); + g_free(path); + break; + } + g_free(path); + + int fd = -1; + size_t opt_len = sizeof(int); + int rc = zmq_getsockopt(handle->requester, ZMQ_FD, &fd, &opt_len); + ASSERT(rc == 0); + + handle->async_channel = g_io_channel_unix_new(fd); + if (!handle->async_channel) + { + ERROR("g_io_channel_unix_new error"); + break; + } + + guint source_id = 0; + source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_subscribe_handler, handle); + handle->async_source_id = source_id; + handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free); + ASSERT(handle->subscribe_cb_table); + + // add a subscriber handle to the global list + ghandle->subscribe_handles = g_list_append(ghandle->subscribe_handles, handle); + VERBOSE("the count of subscribe handles = %d", g_list_length(ghandle->subscribe_handles)); + } + + is_ok = TRUE; + VERBOSE("A new handle is created : %s, %s", handle->service, handle->id); + } while(0); + + pthread_mutex_unlock(&__gmutex); + + if (FALSE == is_ok) + { + if (handle) + { + __pims_ipc_free_handle(handle); + handle = NULL; + } + } + + return handle; +} + +API pims_ipc_h pims_ipc_create(char *service) +{ + return __pims_ipc_create(service, PIMS_IPC_MODE_REQ); +} + +API pims_ipc_h pims_ipc_create_for_subscribe(char *service) +{ + return __pims_ipc_create(service, PIMS_IPC_MODE_SUB); +} + +static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode) +{ + pims_ipc_t *handle = (pims_ipc_t *)ipc; + + if (mode == PIMS_IPC_MODE_REQ) + { + if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0) + { + WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed"); + } + } + + __pims_ipc_free_handle(handle); +} + +API void pims_ipc_destroy(pims_ipc_h ipc) +{ + __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ); +} + +API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc) +{ + __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB); +} + +static void __pims_ipc_data_free_cb(void *data, void *hint) +{ + if (hint) + g_free(hint); +} + +static int __pims_ipc_send(pims_ipc_t *handle, char *module, char *function, pims_ipc_h data_in) +{ + gboolean is_valid = FALSE; + unsigned int sequence_no = 0; + gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function); + + // init messages + zmq_msg_t sequence_no_msg; + zmq_msg_t call_id_msg; + zmq_msg_t data_in_msg; + + zmq_msg_init_size(&sequence_no_msg, sizeof(unsigned int)); + GET_CALL_SEQUNECE_NO(handle, sequence_no); + memcpy(zmq_msg_data(&sequence_no_msg), &(sequence_no), sizeof(unsigned int)); + + zmq_msg_init_data(&call_id_msg, call_id, strlen(call_id) + 1, __pims_ipc_data_free_cb, call_id); + VERBOSE("call id = %s", (char*)zmq_msg_data(&call_id_msg)); + + zmq_msg_init(&data_in_msg); + + do { + // send sequence no + if (_pims_zmq_msg_send(&sequence_no_msg, handle->requester, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + if (data_in == NULL) + { + // send call id + if (_pims_zmq_msg_send(&call_id_msg, handle->requester, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + } + else + { + // send call id + if (_pims_zmq_msg_send(&call_id_msg, handle->requester, ZMQ_SNDMORE) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + + // marshal data + if (pims_ipc_data_marshal_with_zmq(data_in, &data_in_msg) != 0) + { + ERROR("marshal error"); + break; + } + + VERBOSE("the size of sending data = %d", zmq_msg_size(&data_in_msg)); + + // send data + if (_pims_zmq_msg_send(&data_in_msg, handle->requester, 0) == -1) + { + ERROR("send error : %s", zmq_strerror(errno)); + break; + } + } + + is_valid = TRUE; + } while (0); + + zmq_msg_close(&sequence_no_msg); + zmq_msg_close(&call_id_msg); + zmq_msg_close(&data_in_msg); + + if (is_valid == FALSE) + return -1; + return 0; +} + +static int __pims_ipc_receive(pims_ipc_t *handle, pims_ipc_data_h *data_out) +{ + gboolean is_ok = FALSE; + gboolean is_valid = FALSE; + int64_t more = 0; + pims_ipc_data_h dhandle = NULL; + unsigned int sequence_no = 0; + + zmq_msg_t sequence_no_msg; + zmq_msg_t call_id_msg; + zmq_msg_t data_out_msg; + + while (1) + { + is_valid = FALSE; + more = 0; + + zmq_msg_init(&sequence_no_msg); + zmq_msg_init(&call_id_msg); + zmq_msg_init(&data_out_msg); + + do { + // recv sequence no + if (_pims_zmq_msg_recv(&sequence_no_msg, handle->requester, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + memcpy(&sequence_no, zmq_msg_data(&sequence_no_msg), sizeof(unsigned int)); + + // recv call id + if (_pims_zmq_msg_recv(&call_id_msg, handle->requester, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + + size_t more_size = sizeof(more); + zmq_getsockopt(handle->requester, ZMQ_RCVMORE, &more, &more_size); + if (more) + { + if (_pims_zmq_msg_recv(&data_out_msg, handle->requester, 0) == -1) + { + ERROR("recv error : %s", zmq_strerror(errno)); + break; + } + dhandle = pims_ipc_data_unmarshal_with_zmq(&data_out_msg); + if (dhandle == NULL) + { + ERROR("unmarshal error"); + break; + } + + if (sequence_no == handle->call_sequence_no) + { + if (data_out != NULL) + *data_out = dhandle; + is_ok = TRUE; + } + else + { + pims_ipc_data_destroy(dhandle); + DEBUG("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no); + } + } + else + { + if (sequence_no == handle->call_sequence_no) + is_ok = TRUE; + } + + is_valid = TRUE; + } while (0); + + zmq_msg_close(&sequence_no_msg); + zmq_msg_close(&call_id_msg); + zmq_msg_close(&data_out_msg); + + if (is_ok) + return 0; + + if (is_valid == FALSE) + return -1; + } + + return -1; +} + +API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in, + pims_ipc_data_h *data_out) +{ + pims_ipc_t *handle = (pims_ipc_t *)ipc; + + + if (ipc == NULL) + { + ERROR("invalid handle : %p", ipc); + return -1; + } + + if (!module || !function) + { + ERROR("invalid argument"); + return -1; + } + + if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) + { + ERROR("the previous call is in progress : %p", ipc); + return -1; + } + + if (__pims_ipc_send(handle, module, function, data_in) != 0) + { + return -1; + } + + if (__pims_ipc_receive(handle, data_out) != 0) + { + return -1; + } + + return 0; +} + +static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data) +{ + pims_ipc_t *handle = (pims_ipc_t *)data; + uint32_t zmq_events = 0; + size_t opt_len = 0; + int rc = 0; + + VERBOSE(""); + + opt_len = sizeof(uint32_t); + while (1) + { + rc = zmq_getsockopt(handle->requester, ZMQ_EVENTS, &zmq_events, &opt_len); + ASSERT(rc == 0); + if (ZMQ_POLLIN & zmq_events) { + pims_ipc_data_h dhandle = NULL; + if (__pims_ipc_receive(handle, &dhandle) == 0) + { + VERBOSE("call status = %d", handle->call_status); + if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) + { + pims_ipc_data_destroy(dhandle); + } + else + { + handle->call_status = PIMS_IPC_CALL_STATUS_READY; + handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata); + pims_ipc_data_destroy(dhandle); + } + } + } + else + { + break; + } + } + + return FALSE; +} + +API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in, + pims_ipc_call_async_cb callback, void *userdata) +{ + pims_ipc_t *handle = (pims_ipc_t *)ipc; + guint source_id = 0; + + if (ipc == NULL) + { + ERROR("invalid handle : %p", ipc); + return -1; + } + + if (!module || !function || !callback) + { + ERROR("invalid argument"); + return -1; + } + + if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) + { + ERROR("the previous call is in progress : %p", ipc); + return -1; + } + + handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS; + handle->call_async_callback = callback; + handle->call_async_userdata = userdata; + + // add a callback for GIOChannel + if (!handle->async_channel) + { + int fd = -1; + size_t opt_len = sizeof(int); + int rc = zmq_getsockopt(handle->requester, ZMQ_FD, &fd, &opt_len); + ASSERT(rc == 0); + + handle->async_channel = g_io_channel_unix_new(fd); + if (!handle->async_channel) + { + ERROR("g_io_channel_unix_new error"); + return -1; + } + } + + source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle); + handle->async_source_id = source_id; + + if (__pims_ipc_send(handle, module, function, data_in) != 0) + { + g_source_remove(source_id); + return -1; + } + + uint32_t zmq_events = 0; + size_t opt_len = sizeof(uint32_t); + int rc = 0; + rc = zmq_getsockopt(handle->requester, ZMQ_EVENTS, &zmq_events, &opt_len); + ASSERT(rc == 0); + + return 0; +} + +API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc) +{ + pims_ipc_t *handle = (pims_ipc_t *)ipc; + + if (ipc == NULL) + { + ERROR("invalid handle : %p", ipc); + return false; + } + + if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS) + return true; + else + return false; +} + +API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata) +{ + gchar *call_id = NULL; + pims_ipc_cb_t *cb_data = NULL; + pims_ipc_t *handle = (pims_ipc_t *)ipc; + + if (ipc == NULL || handle->subscribe_cb_table == NULL) + { + ERROR("invalid handle : %p", ipc); + return -1; + } + + if (!module || !event || !callback) + { + ERROR("invalid argument"); + return -1; + } + + cb_data = g_new0(pims_ipc_cb_t, 1); + call_id = PIMS_IPC_MAKE_CALL_ID(module, event); + + VERBOSE("subscribe cb id[%s]", call_id); + cb_data->callback = callback; + cb_data->user_data = userdata; + g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data); + + return 0; +} + +API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event) +{ + gchar *call_id = NULL; + pims_ipc_t *handle = (pims_ipc_t *)ipc; + + if (ipc == NULL || handle->subscribe_cb_table == NULL) + { + ERROR("invalid handle : %p", ipc); + return -1; + } + + if (!module || !event) + { + ERROR("invalid argument"); + return -1; + } + + call_id = PIMS_IPC_MAKE_CALL_ID(module, event); + + VERBOSE("unsubscribe cb id[%s]", call_id); + + if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) + { + ERROR("g_hash_table_remove error"); + g_free(call_id); + return -1; + } + + g_free(call_id); + return 0; +} diff --git a/src/pims-socket.c b/src/pims-socket.c new file mode 100644 index 0000000..f2ee131 --- /dev/null +++ b/src/pims-socket.c @@ -0,0 +1,286 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef _NON_SLP +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +typedef struct +{ + int sockfd; + server_socket_client_closed_cb callback; + void *user_data; + GHashTable *client_table; +} server_socket_context_t; + +static int __socket_writen(int fd, char *buf, int buf_size) +{ + int ret, writed = 0; + while (buf_size) + { + ret = write(fd, buf+writed, buf_size); + if (-1 == ret) + { + if (EINTR == errno) + continue; + else + return ret; + } + writed += ret; + buf_size -= ret; + } + return writed; +} + +static int __socket_readn(int fd, char *buf, int buf_size) +{ + int ret, read_size = 0; + + while (buf_size) + { + ret = read(fd, buf+read_size, buf_size); + if (-1 == ret) + { + if (EINTR == errno) + continue; + else + return ret; + } + read_size += ret; + buf_size -= ret; + } + return read_size; +} + +#define PIMS_IPC_PID_BUFFER_SIZE 20 +static gboolean __request_handler(GIOChannel *src, GIOCondition condition, gpointer data) +{ + server_socket_context_t *context = (server_socket_context_t*)data; + int ret = -1; + int fd = -1; + int orig_fd = -1; + char *pid = NULL; + char buffer[PIMS_IPC_PID_BUFFER_SIZE] = ""; + + fd = g_io_channel_unix_get_fd(src); + + if (G_IO_HUP & condition) + { + close(fd); + + if (g_hash_table_lookup_extended(context->client_table, GINT_TO_POINTER(fd), + (gpointer*)&orig_fd, (gpointer*)&pid) == TRUE) + { + VERBOSE("found pid for %u = %s", fd, pid); + context->callback((const char*)pid, context->user_data); + g_hash_table_remove(context->client_table, (gconstpointer)fd); + } + else + { + VERBOSE("unable to find pid for %u", fd); + } + + return FALSE; + } + + memset(buffer, 0x00, PIMS_IPC_PID_BUFFER_SIZE); + ret = read(fd, (char *)buffer, PIMS_IPC_PID_BUFFER_SIZE-1); + if (ret <= 0) + { + ERROR("read error : %s", strerror(errno)); + close(fd); + + return FALSE; + } + + VERBOSE("client fd = %u, pid = %s", fd, buffer); + g_hash_table_insert(context->client_table, GINT_TO_POINTER(fd), g_strdup(buffer)); + + pid_t mypid = getpid(); + ret = __socket_writen(fd, (char*)&mypid, sizeof(pid_t)); + if (ret != sizeof(pid_t)) + { + ERROR("write error : %s", strerror(errno)); + close(fd); + g_hash_table_remove(context->client_table, (gconstpointer)fd); + + return FALSE; + } + + return TRUE; +} + +static gboolean __socket_handler(GIOChannel *src, GIOCondition condition, gpointer data) +{ + GIOChannel *channel; + server_socket_context_t *context = (server_socket_context_t*)data; + int client_sockfd = -1; + int sockfd = context->sockfd; + struct sockaddr_un clientaddr; + socklen_t client_len = sizeof(clientaddr); + + client_sockfd = accept(sockfd, (struct sockaddr *)&clientaddr, &client_len); + if (-1 == client_sockfd) + { + ERROR("accept error : %s", strerror(errno)); + return TRUE; + } + + channel = g_io_channel_unix_new(client_sockfd); + g_io_add_watch(channel, G_IO_IN|G_IO_HUP, __request_handler, data); + g_io_channel_unref(channel); + + return TRUE; +} + +int _server_socket_init(const char *path, gid_t group, mode_t mode, + server_socket_client_closed_cb callback, void *user_data) +{ + int sockfd = -1; + GIOChannel *gio = NULL; + + if (sd_listen_fds(1) == 1 && sd_is_socket_unix(SD_LISTEN_FDS_START, SOCK_STREAM, -1, path, 0) > 0) + { + DEBUG("using system daemon"); + + sockfd = SD_LISTEN_FDS_START; + } + else + { + struct sockaddr_un addr; + int ret = -1; + + DEBUG("using local socket"); + + unlink(path); + + bzero(&addr, sizeof(addr)); + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path); + + sockfd = socket(PF_UNIX, SOCK_STREAM, 0); + if (-1 == sockfd) + { + ERROR("socket error : %s", strerror(errno)); + return -1; + } + + ret = bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)); + if (-1 == ret) + { + ERROR("bind error : %s", strerror(errno)); + close(sockfd); + return -1; + } + + ret = chown(path, getuid(), group); + ret = chmod(path, mode); + + ret = listen(sockfd, 30); + if (-1 == ret) + { + ERROR("listen error : %s", strerror(errno)); + close(sockfd); + return -1; + } + } + + gio = g_io_channel_unix_new(sockfd); + + server_socket_context_t *context = g_new0(server_socket_context_t, 1); + context->sockfd = sockfd; + context->callback = callback; + context->user_data = user_data; + context->client_table = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, g_free); + ASSERT(context->client_table); + + g_io_add_watch(gio, G_IO_IN, __socket_handler, (gpointer)context); + + return sockfd; +} + +int _client_socket_init(const char *path, const char *pid) +{ + int sockfd = -1; + int ret = -1; + struct sockaddr_un caddr = {0}; + pid_t server_pid = 0; + + ASSERT(path != NULL); + ASSERT(pid != NULL); + bzero(&caddr, sizeof(caddr)); + caddr.sun_family = AF_UNIX; + snprintf(caddr.sun_path, sizeof(caddr.sun_path), "%s", path); + + sockfd = socket(PF_UNIX, SOCK_STREAM, 0); + if (-1 == sockfd) + { + ERROR("socket error : %s", strerror(errno)); + return -1; + } + + ret = connect(sockfd, (struct sockaddr *)&caddr, sizeof(caddr)); + if (-1 == ret) { + ERROR("connect error : %s", strerror(errno)); + close(sockfd); + return -1; + } + ret = __socket_writen(sockfd, (char*)pid, strlen(pid) + 1); + if (ret <= 0) + { + ERROR("write error : %s", strerror(errno)); + close(sockfd); + return -1; + } + ret = __socket_readn(sockfd, (char*)&server_pid, sizeof(pid_t)); + if (ret != sizeof(pid_t)) + { + ERROR("read error : %s", strerror(errno)); + close(sockfd); + return -1; + } + + return sockfd; +} + +#else +#include + +int _server_socket_init(const char *path, gid_t group, mode_t mode, + server_socket_client_closed_cb callback, void *user_data) +{ + return 0; +} + +int _client_socket_init(const char *path, const char *pid) +{ + return 0; +} + +#endif diff --git a/src/pims-socket.h b/src/pims-socket.h new file mode 100644 index 0000000..5cf2696 --- /dev/null +++ b/src/pims-socket.h @@ -0,0 +1,42 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef __PIMS_SOCKET_H__ +#define __PIMS_SOCKET_H__ + +#include +#include +#include + +#ifdef _cplusplus +extern "C" +{ +#endif + +typedef void (*server_socket_client_closed_cb)(const char *pid, void *user_data); +int _server_socket_init(const char *path, gid_t group, mode_t mode, + server_socket_client_closed_cb callback, void *user_data); +int _client_socket_init(const char *path, const char *pid); + +#ifdef _cplusplus +} +#endif + +#endif /* __PIMS_SOCKET_H__ */ + diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt new file mode 100755 index 0000000..fb2513b --- /dev/null +++ b/test/CMakeLists.txt @@ -0,0 +1,44 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.6) +PROJECT(pims_ipc_test C) + +#################################################################### +# Basic configuration # +#################################################################### +# Set ready to build +SET(EXT_LIBS_DIRS "") +SET(EXT_LIBS_DEFS "") +SET(EXT_LIBS_LDFLAGS "") + +# Set external libraries +SET(EXT_LIBS + E_GLIB +) + +FOREACH(flag ${EXT_LIBS}) + SET(EXT_LIBS_DIRS ${EXT_LIBS_DIRS} ${${flag}_INCLUDE_DIRS}) + SET(EXT_LIBS_DEFS ${EXT_LIBS_DEFS} ${${flag}_CFLAGS_OTHER}) + SET(EXT_LIBS_LDFLAGS ${EXT_LIBS_LDFLAGS} ${${flag}_LDFLAGS}) +ENDFOREACH(flag) + +#################################################################### +# Build this project # +#################################################################### + +# Set source +SET(SRCS + ${CMAKE_CURRENT_SOURCE_DIR}/test.c +) + +INCLUDE_DIRECTORIES( + ${CMAKE_SOURCE_DIR}/include + ${EXT_LIBS_DIRS} +) + +ADD_DEFINITIONS( + ${EXT_LIBS_DEFS} +) + +ADD_EXECUTABLE(${PROJECT_NAME} ${SRCS}) +TARGET_LINK_LIBRARIES(${PROJECT_NAME} pims-ipc ${EXT_LIBS_LDFLAGS}) +INSTALL(TARGETS ${PROJECT_NAME} DESTINATION lib) + diff --git a/test/sock-test.c b/test/sock-test.c new file mode 100644 index 0000000..b086864 --- /dev/null +++ b/test/sock-test.c @@ -0,0 +1,178 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include + +#define NAME "dom_sock_test" + +#define REPEAT_COUNT 100 +//#define BUFFER_SIZE 1024*1024 +#define BUFFER_SIZE 1024 + +#define PLUS_TIME(a, b, c) do {\ + a.tv_sec = b.tv_sec + c.tv_sec;\ + a.tv_usec = b.tv_usec + c.tv_usec;\ + if (a.tv_usec >= 1000000) {\ + a.tv_sec++;\ + a.tv_usec -= 1000000;\ + }\ +} while (0) + +#define MINUS_TIME(a, b, c) do {\ + a.tv_sec = b.tv_sec - c.tv_sec;\ + a.tv_usec = b.tv_usec - c.tv_usec;\ + if (a.tv_usec < 0) {\ + a.tv_sec--;\ + a.tv_usec += 1000000;\ + }\ +} while (0) + + +static char buffer[BUFFER_SIZE+1] = {0,}; + +void server_main() +{ + int sock, msgsock, rval; + struct sockaddr_un server; + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + perror("opening stream socket"); + exit(1); + } + + server.sun_family = AF_UNIX; + strcpy(server.sun_path, NAME); + if (bind(sock, (struct sockaddr *) &server, sizeof(struct sockaddr_un))) { + perror("binding stream socket"); + exit(1); + } + printf("Socket has name %s\n", server.sun_path); + listen(sock, 5); + for (;;) { + msgsock = accept(sock, 0, 0); + if (msgsock == -1) + perror("accept"); + else do { + if ((rval = read(msgsock, buffer, BUFFER_SIZE)) < 0) + perror("reading stream message"); + else if (rval == 0) + printf("Ending connection\n"); + else + { + if ((rval = write(msgsock, buffer, BUFFER_SIZE)) < 0) + perror("writing stream message"); + + } + } while (rval > 0); + close(msgsock); + } + close(sock); + unlink(NAME); +} + +void client_main() +{ + int i; + int sock; + struct sockaddr_un server; + struct timeval start_tv = {0, 0}; + struct timeval end_tv = {0, 0}; + struct timeval diff_tv = {0, 0}; + struct timeval prev_sum_tv = {0, 0}; + struct timeval sum_tv = {0, 0}; + int count = 0; + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + perror("opening stream socket"); + exit(1); + } + server.sun_family = AF_UNIX; + strcpy(server.sun_path, NAME); + + for (i = 0; i < BUFFER_SIZE; i++) + buffer[i] = 'a'; + buffer[i] = 0; + + if (connect(sock, (struct sockaddr *) &server, sizeof(struct sockaddr_un)) < 0) { + close(sock); + perror("connecting stream socket"); + exit(1); + } + for (i = 0; i < REPEAT_COUNT; i++) + { + + gettimeofday(&start_tv, NULL); + if (write(sock, buffer, BUFFER_SIZE) < 0) + perror("writing on stream socket"); + if (read(sock, buffer, BUFFER_SIZE) < 0) + perror("read on stream socket"); + gettimeofday(&end_tv, NULL); + MINUS_TIME(diff_tv, end_tv, start_tv); + PLUS_TIME(sum_tv, diff_tv, prev_sum_tv); + prev_sum_tv = sum_tv; + count++; + printf("start[%lu:%lu] end[%lu:%lu] diff[%lu:%lu]\n", + start_tv.tv_sec, start_tv.tv_usec, + end_tv.tv_sec, end_tv.tv_usec, + diff_tv.tv_sec, diff_tv.tv_usec); + + } + if (i == REPEAT_COUNT) + { + printf("sum[%lu:%lu] count[%d]\n", + sum_tv.tv_sec, sum_tv.tv_usec, count); + printf("avg[%lu:%lu]\n", + sum_tv.tv_sec / count, (sum_tv.tv_sec % count * 1000000 + sum_tv.tv_usec) / count); + } + + close(sock); +} + +int main(int argc, char *argv[]) +{ + if (argc != 2) + { + printf("Usage: %s client|server\n", argv[0]); + return -1; + } + if (strcmp(argv[1], "client") == 0) + { + printf("client mode..\n"); + client_main(); + } + else if (strcmp(argv[1], "server") == 0) + { + printf("server mode..\n"); + server_main(); + } + else + { + printf("Usage: %s client|server\n", argv[0]); + return -1; + } + return 0; +} + + diff --git a/test/test.c b/test/test.c new file mode 100644 index 0000000..6335a38 --- /dev/null +++ b/test/test.c @@ -0,0 +1,586 @@ +/* + * PIMS IPC + * + * Copyright (c) 2012 Samsung Electronics Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include +#include +#include +#include + +#define THREAD_COUNT 1 +#define REPEAT_COUNT 100 +#define BUFFER_SIZE 1024 +#define PLUS_TIME(a, b, c) do {\ + a.tv_sec = b.tv_sec + c.tv_sec;\ + a.tv_usec = b.tv_usec + c.tv_usec;\ + if (a.tv_usec >= 1000000) {\ + a.tv_sec++;\ + a.tv_usec -= 1000000;\ + }\ +} while (0) + +#define MINUS_TIME(a, b, c) do {\ + a.tv_sec = b.tv_sec - c.tv_sec;\ + a.tv_usec = b.tv_usec - c.tv_usec;\ + if (a.tv_usec < 0) {\ + a.tv_sec--;\ + a.tv_usec += 1000000;\ + }\ +} while (0) + +typedef struct { + int thread_count; + int repeat_count; + int message_size; + pims_ipc_h ipc; +} test_arg_t; + +static gboolean __is_async = FALSE; +static gboolean __is_publish = FALSE; + +void test_function(pims_ipc_h ipc, pims_ipc_data_h indata, pims_ipc_data_h *outdata, void *userdata) +{ + unsigned int size = 0; + char *str = NULL; + + if (indata) + { + str = (char*)pims_ipc_data_get(indata, &size); + if (!str) + { + printf("pims_ipc_data_get error\n"); + return; + } +#if 0 + str = (char*)pims_ipc_data_get(indata, &size); + if (!str) + { + printf("pims_ipc_data_get error\n"); + return; + } + printf("%s\n", str); + str = (char*)pims_ipc_data_get(indata, &size); + if (!str) + { + printf("pims_ipc_data_get error\n"); + return; + } + printf("%s\n", str); +#endif + } + + if (str && outdata) + { + *outdata = pims_ipc_data_create(0); + if (!*outdata) + { + printf("pims_ipc_data_create error\n"); + return; + } + if (pims_ipc_data_put(*outdata, str, strlen(str) + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + pims_ipc_data_destroy(*outdata); + *outdata = NULL; + return; + } +#if 0 + if (pims_ipc_data_put(*outdata, "welcome", strlen("welcome") + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + pims_ipc_data_destroy(*outdata); + *outdata = NULL; + return; + } + if (pims_ipc_data_put(*outdata, "to the jungle", strlen("to the jungle") + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + pims_ipc_data_destroy(*outdata); + *outdata = NULL; + return; + } +#endif + } +} + +static gboolean publish_callback(gpointer data) +{ + pims_ipc_data_h indata = NULL; + + indata = pims_ipc_data_create(0); + if (!indata) + { + printf("pims_ipc_data_create error\n"); + return false; + } + if (pims_ipc_data_put(indata, "publish test", strlen("publish test") + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + return false; + } + if (pims_ipc_svc_publish("test_module", "publish", indata) != 0) + { + printf("pims_ipc_svc_publish error\n"); + return false; + } + + if (indata) + pims_ipc_data_destroy(indata); + + return true; +} + +int server_main() +{ + + pims_ipc_svc_init("pims-ipc-test", getuid(), 0660); + + if (pims_ipc_svc_register("test_module", "test_function", test_function, NULL) != 0) + { + printf("pims_ipc_svc_register error\n"); + return -1; + } + + if (__is_publish) + { + pims_ipc_svc_init_for_publish("pims-ipc-pub-test", getuid(), 0660); + g_timeout_add_seconds(3, publish_callback, NULL); + } + + pims_ipc_svc_run_main_loop(NULL); + + pims_ipc_svc_deinit(); + + return 0; +} + +static gboolean call_async_idler_callback(gpointer data); + +static void call_async_callback(pims_ipc_h ipc, pims_ipc_data_h data_out, void *userdata) +{ + unsigned int size = 0; + char *str = NULL; + + printf("(%x) call_async_callback(%p)", (unsigned int)pthread_self(), ipc); + if (data_out) + { + str = (char*)pims_ipc_data_get(data_out, &size); + if (!str) + { + printf("pims_ipc_data_get error\n"); + } + else + { + printf("pims_ipc_data_get(%s) success\n", str); + } + } + + sleep(1); + + pims_ipc_data_h indata = NULL; + pims_ipc_data_h outdata = NULL; + + indata = pims_ipc_data_create(0); + if (!indata) + { + printf("pims_ipc_data_create error\n"); + return; + } + if (pims_ipc_data_put(indata, "hello world", strlen("hello world") + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + return; + } + + if (pims_ipc_call(ipc, "test_module", "test_function", indata, &outdata) != 0) + { + printf("pims_ipc_call error\n"); + return; + } + + if (indata) + pims_ipc_data_destroy(indata); + if (outdata) + pims_ipc_data_destroy(outdata); + + sleep(1); + + call_async_idler_callback(ipc); +} + +static gboolean call_async_idler_callback(gpointer data) +{ + pims_ipc_data_h indata = NULL; + pims_ipc_data_h outdata = NULL; + pims_ipc_h ipc = data; + bool ret = false; + + indata = pims_ipc_data_create(0); + if (!indata) + { + printf("pims_ipc_data_create error\n"); + return FALSE; + } + if (pims_ipc_data_put(indata, "hello world", strlen("hello world") + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + return FALSE; + } + + printf("(%x) call_async_idler_callback(%p)\n", (unsigned int)pthread_self(), ipc); + ret = pims_ipc_is_call_in_progress(ipc); + printf("(%x) before async call : pims_ipc_is_call_in_progress(%p) = %d\n", (unsigned int)pthread_self(), ipc, ret); + if (pims_ipc_call_async(ipc, "test_module", "test_function", indata, call_async_callback, NULL) != 0) + { + printf("pims_ipc_call_async error\n"); + return FALSE; + } + ret = pims_ipc_is_call_in_progress(ipc); + printf("(%x) after async call : pims_ipc_is_call_in_progress(%p) = %d\n", (unsigned int)pthread_self(), ipc, ret); + + if (pims_ipc_call(ipc, "test_module", "test_function", indata, &outdata) != 0) + { + printf("pims_ipc_call error during async-call\n"); + return FALSE; + } + if (indata) + pims_ipc_data_destroy(indata); + if (outdata) + pims_ipc_data_destroy(outdata); + + return FALSE; +} + +int client_main(pims_ipc_h ipc, int repeat_count, int message_size) +{ + pims_ipc_data_h indata = NULL; + pims_ipc_data_h outdata = NULL; + int retval = -1; + unsigned int size = 0; + char *str = NULL; + int i = 0; + struct timeval start_tv = {0, 0}; + struct timeval end_tv = {0, 0}; + struct timeval diff_tv = {0, 0}; + struct timeval prev_sum_tv = {0, 0}; + struct timeval sum_tv = {0, 0}; + int count = 0; + + char *buffer = g_malloc0(message_size + 1); + + for (i = 0; i < message_size; i++) + buffer[i] = 'a'; + buffer[i] = 0; + + for (i = 0; i < repeat_count; i++) + { + gettimeofday(&start_tv, NULL); + indata = pims_ipc_data_create(0); + if (!indata) + { + printf("pims_ipc_data_create error\n"); + break; + } + if (pims_ipc_data_put(indata, buffer, strlen(buffer) + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + break; + } +#if 0 + if (pims_ipc_data_put(indata, "hellow", strlen("hellow") + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + break; + } + if (pims_ipc_data_put(indata, "world", strlen("world") + 1) != 0) + { + printf("pims_ipc_data_put error\n"); + break; + } +#endif + if (pims_ipc_call(ipc, "test_module", "test_function", indata, &outdata) != 0) + { + printf("pims_ipc_call error\n"); + break; + } + pims_ipc_data_destroy(indata); + indata = NULL; + if (outdata) + { + str = (char*)pims_ipc_data_get(outdata, &size); + if (!str) + { + printf("pims_ipc_data_get error\n"); + break; + } +#if 0 + str = (char*)pims_ipc_data_get(outdata, &size); + if (!str) + { + printf("pims_ipc_data_get error\n"); + break; + } + printf("%s\n", str); + str = (char*)pims_ipc_data_get(outdata, &size); + if (!str) + { + printf("pims_ipc_data_get error\n"); + break; + } + printf("%s\n", str); +#endif + + pims_ipc_data_destroy(outdata); + outdata = NULL; + } + gettimeofday(&end_tv, NULL); + MINUS_TIME(diff_tv, end_tv, start_tv); + PLUS_TIME(sum_tv, diff_tv, prev_sum_tv); + prev_sum_tv = sum_tv; + count++; + printf("(%x) start[%lu:%lu] end[%lu:%lu] diff[%lu:%lu]\n", + (unsigned int)pthread_self(), + start_tv.tv_sec, start_tv.tv_usec, + end_tv.tv_sec, end_tv.tv_usec, + diff_tv.tv_sec, diff_tv.tv_usec); + } + + if (i == repeat_count) + { + printf("(%x) sum[%lu:%lu] count[%d]\n", + (unsigned int)pthread_self(), + sum_tv.tv_sec, sum_tv.tv_usec, count); + printf("(%x) avg[%lu:%lu]\n", + (unsigned int)pthread_self(), + sum_tv.tv_sec / count, (sum_tv.tv_sec % count * 1000000 + sum_tv.tv_usec) / count); + retval = 0; + } + + if (indata) + pims_ipc_data_destroy(indata); + if (outdata) + pims_ipc_data_destroy(outdata); + + if (__is_async) + g_idle_add(call_async_idler_callback, ipc); + + return retval; +} + +static void* __worker_task(void *arg) +{ + test_arg_t *_arg = arg; + printf("(%x) worker %p, %d, %d\n", (unsigned int)pthread_self(), _arg->ipc, _arg->repeat_count, _arg->message_size); + client_main(_arg->ipc, _arg->repeat_count, _arg->message_size); + printf("worker task has been done\n"); + return NULL; +} + +static gboolean __launch_worker(gpointer data) +{ + test_arg_t *_arg = data; + int i = 0; + GThread **worker_threads = NULL; + gboolean joinable = FALSE; + + worker_threads = g_new0(GThread*, _arg->thread_count); + if (!__is_async && !__is_publish) + joinable = TRUE; + + for (i = 0; i < _arg->thread_count; i++) + { + pims_ipc_h ipc = NULL; + test_arg_t *arg = g_new0(test_arg_t, 1); + arg->repeat_count = _arg->repeat_count; + arg->message_size = _arg->message_size; + ipc = pims_ipc_create("pims-ipc-test"); + if (!ipc) + { + printf("pims_ipc_create error\n"); + return -1; + } + arg->ipc = ipc; + + worker_threads[i] = g_thread_create(__worker_task, (void*)arg, joinable, NULL); + } + + if (!__is_async && !__is_publish) + { + for (i = 0; i < _arg->thread_count; i++) + { + g_thread_join(worker_threads[i]); + } + } + + g_free(worker_threads); + + return FALSE; +} + +static void subscribe_callback(pims_ipc_h ipc, pims_ipc_data_h data, void *userdata) +{ + unsigned int size = 0; + char *str = NULL; + + printf("(%x) subscribe_callback(%p)", (unsigned int)pthread_self(), ipc); + if (data) + { + str = (char*)pims_ipc_data_get(data, &size); + if (!str) + { + printf("pims_ipc_data_get error\n"); + } + else + { + printf("pims_ipc_data_get(%s) success\n", str); + } + } +} + +static void __print_usage(char *prog) +{ + printf("Usage: %s [-r -s -t -a] client|server \n", prog); +} + +int main(int argc, char *argv[]) +{ + int repeat_count = REPEAT_COUNT; + int message_size = BUFFER_SIZE; + int thread_count = THREAD_COUNT; + int c = 0; + test_arg_t arg; + + opterr = 0; + optind = 0; + + g_thread_init(NULL); + + while ((c = getopt(argc, argv, "r:s:t:ap")) != -1) + { + switch (c) + { + case 'r': + repeat_count = atoi(optarg); + break; + case 's': + message_size = atoi(optarg); + break; + case 't': + thread_count = atoi(optarg); + break; + case 'a': + __is_async = TRUE; + break; + case 'p': + __is_publish = TRUE; + break; + case '?': + __print_usage(argv[0]); + return -1; + } + + } + + if (argc - optind != 1) + { + __print_usage(argv[0]); + return -1; + } + + if (strcmp(argv[optind], "client") == 0) + { + // async call + GMainLoop* main_loop = g_main_loop_new(NULL, FALSE); + + printf("client mode.. with %d threads\n", thread_count); + + if (__is_publish) + { + pims_ipc_h ipc = NULL; + ipc = pims_ipc_create_for_subscribe("pims-ipc-pub-test"); + if (!ipc) + { + printf("pims_ipc_create_for_subscribe error\n"); + return -1; + } + pims_ipc_destroy_for_subscribe(ipc); + ipc = pims_ipc_create_for_subscribe("pims-ipc-pub-test"); + if (!ipc) + { + printf("pims_ipc_create_for_subscribe error\n"); + return -1; + } + if (pims_ipc_subscribe(ipc, "test_module", "publish", subscribe_callback, NULL) != 0) + { + printf("pims_ipc_subscribe error\n"); + return -1; + } + if (pims_ipc_unsubscribe(ipc, "test_module", "publish") != 0) + { + printf("pims_ipc_unsubscribe error\n"); + return -1; + } + if (pims_ipc_subscribe(ipc, "test_module", "publish", subscribe_callback, NULL) != 0) + { + printf("pims_ipc_subscribe error\n"); + return -1; + } + } + + if (thread_count <= 1) + { + pims_ipc_h ipc = NULL; + ipc = pims_ipc_create("pims-ipc-test"); + if (!ipc) + { + printf("pims_ipc_create error\n"); + return -1; + } + + client_main(ipc, repeat_count, message_size); + } + else + { + arg.message_size = message_size; + arg.repeat_count = repeat_count; + arg.thread_count = thread_count; + if (__is_async || __is_publish) + g_idle_add(__launch_worker, &arg); + else + __launch_worker(&arg); + } + + if (__is_async || __is_publish) + g_main_loop_run(main_loop); + } + else if (strcmp(argv[optind], "server") == 0) + { + printf("server mode..\n"); + server_main(); + } + else + { + __print_usage(argv[0]); + return -1; + } + return 0; +} -- 2.7.4