Tizen 2.1 base
authorJinkun Jang <jinkun.jang@samsung.com>
Tue, 12 Mar 2013 16:51:21 +0000 (01:51 +0900)
committerJinkun Jang <jinkun.jang@samsung.com>
Tue, 12 Mar 2013 16:51:21 +0000 (01:51 +0900)
20 files changed:
CMakeLists.txt [new file with mode: 0755]
LICENSE.APLv2 [new file with mode: 0644]
NOTICE [new file with mode: 0644]
include/pims-ipc-data.h [new file with mode: 0644]
include/pims-ipc-svc.h [new file with mode: 0644]
include/pims-ipc-types.h [new file with mode: 0644]
include/pims-ipc.h [new file with mode: 0644]
packaging/pims-ipc.spec [new file with mode: 0644]
pims-ipc.manifest [new file with mode: 0644]
pims-ipc.pc.in [new file with mode: 0755]
src/pims-debug.h [new file with mode: 0644]
src/pims-internal.h [new file with mode: 0644]
src/pims-ipc-data.c [new file with mode: 0644]
src/pims-ipc-svc.c [new file with mode: 0644]
src/pims-ipc.c [new file with mode: 0644]
src/pims-socket.c [new file with mode: 0644]
src/pims-socket.h [new file with mode: 0644]
test/CMakeLists.txt [new file with mode: 0755]
test/sock-test.c [new file with mode: 0644]
test/test.c [new file with mode: 0644]

diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100755 (executable)
index 0000000..4041dfc
--- /dev/null
@@ -0,0 +1,54 @@
+CMAKE_MINIMUM_REQUIRED(VERSION 2.6)
+PROJECT(pims-ipc C)
+
+#IF("${CMAKE_BUILD_TYPE}" STREQUAL "")
+#      SET(CMAKE_BUILD_TYPE "Release")
+#ENDIF("${CMAKE_BUILD_TYPE}" STREQUAL "")
+#MESSAGE("Build type: ${CMAKE_BUILD_TYPE}")
+#SET(CMAKE_BUILD_TYPE "Debug")
+
+SET(DEST_INCLUDE_DIR "include/pims-ipc")
+SET(SRC_INCLUDE_DIR "${CMAKE_SOURCE_DIR}/include")
+
+SET(PREFIX ${CMAKE_INSTALL_PREFIX})
+SET(EXEC_PREFIX "\${prefix}")
+SET(LIBDIR "\${prefix}/lib")
+SET(INCLUDEDIR "\${prefix}/${DEST_INCLUDE_DIR}")
+SET(VERSION_MAJOR 0)
+SET(VERSION "${VERSION_MAJOR}.0.1")
+
+INCLUDE_DIRECTORIES(${SRC_INCLUDE_DIR})
+#SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} -I${CMAKE_SOURCE_DIR}/src -I${SRC_INCLUDE_DIR} -D_NON_SLP")
+SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} -I${CMAKE_SOURCE_DIR}/src -I${SRC_INCLUDE_DIR}")
+
+FILE(GLOB SRCS src/*.c)
+
+INCLUDE(FindPkgConfig)
+#pkg_check_modules(pkgs REQUIRED glib-2.0 gthread-2.0 libzmq)
+pkg_check_modules(pkgs REQUIRED glib-2.0 gthread-2.0 dlog libsystemd-daemon libzmq)
+
+FOREACH(flag ${pkgs_CFLAGS})
+       SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} ${flag}")
+ENDFOREACH(flag)
+
+SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} -fvisibility=hidden")
+
+SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${EXTRA_CFLAGS}")
+
+ADD_DEFINITIONS("-DPREFIX=\"${PREFIX}\"")
+
+ADD_LIBRARY(${PROJECT_NAME} SHARED ${SRCS})
+SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES SOVERSION ${VERSION_MAJOR})
+SET_TARGET_PROPERTIES(${PROJECT_NAME} PROPERTIES VERSION ${VERSION})
+TARGET_LINK_LIBRARIES(${PROJECT_NAME} ${pkgs_LDFLAGS})
+
+CONFIGURE_FILE(${PROJECT_NAME}.pc.in ${PROJECT_NAME}.pc @ONLY)
+SET_DIRECTORY_PROPERTIES(PROPERTIES ADDITIONAL_MAKE_CLEAN_FILES "${PROJECT_NAME}.pc")
+
+INSTALL(TARGETS ${PROJECT_NAME} DESTINATION lib)
+INSTALL(FILES ${PROJECT_NAME}.pc DESTINATION lib/pkgconfig)
+
+FILE(GLOB HEADER_FILES ${SRC_INCLUDE_DIR}/*.h)
+INSTALL(FILES ${HEADER_FILES} DESTINATION ${DEST_INCLUDE_DIR})
+
+ADD_SUBDIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/test)
diff --git a/LICENSE.APLv2 b/LICENSE.APLv2
new file mode 100644 (file)
index 0000000..bae7f54
--- /dev/null
@@ -0,0 +1,204 @@
+Copyright (c) 2000 - 2012 Samsung Electronics Co., Ltd. All rights reserved.\r
+\r
+                                 Apache License\r
+                           Version 2.0, January 2004\r
+                        http://www.apache.org/licenses/\r
+\r
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION\r
+\r
+   1. Definitions.\r
+\r
+      "License" shall mean the terms and conditions for use, reproduction,\r
+      and distribution as defined by Sections 1 through 9 of this document.\r
+\r
+      "Licensor" shall mean the copyright owner or entity authorized by\r
+      the copyright owner that is granting the License.\r
+\r
+      "Legal Entity" shall mean the union of the acting entity and all\r
+      other entities that control, are controlled by, or are under common\r
+      control with that entity. For the purposes of this definition,\r
+      "control" means (i) the power, direct or indirect, to cause the\r
+      direction or management of such entity, whether by contract or\r
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the\r
+      outstanding shares, or (iii) beneficial ownership of such entity.\r
+\r
+      "You" (or "Your") shall mean an individual or Legal Entity\r
+      exercising permissions granted by this License.\r
+\r
+      "Source" form shall mean the preferred form for making modifications,\r
+      including but not limited to software source code, documentation\r
+      source, and configuration files.\r
+\r
+      "Object" form shall mean any form resulting from mechanical\r
+      transformation or translation of a Source form, including but\r
+      not limited to compiled object code, generated documentation,\r
+      and conversions to other media types.\r
+\r
+      "Work" shall mean the work of authorship, whether in Source or\r
+      Object form, made available under the License, as indicated by a\r
+      copyright notice that is included in or attached to the work\r
+      (an example is provided in the Appendix below).\r
+\r
+      "Derivative Works" shall mean any work, whether in Source or Object\r
+      form, that is based on (or derived from) the Work and for which the\r
+      editorial revisions, annotations, elaborations, or other modifications\r
+      represent, as a whole, an original work of authorship. For the purposes\r
+      of this License, Derivative Works shall not include works that remain\r
+      separable from, or merely link (or bind by name) to the interfaces of,\r
+      the Work and Derivative Works thereof.\r
+\r
+      "Contribution" shall mean any work of authorship, including\r
+      the original version of the Work and any modifications or additions\r
+      to that Work or Derivative Works thereof, that is intentionally\r
+      submitted to Licensor for inclusion in the Work by the copyright owner\r
+      or by an individual or Legal Entity authorized to submit on behalf of\r
+      the copyright owner. For the purposes of this definition, "submitted"\r
+      means any form of electronic, verbal, or written communication sent\r
+      to the Licensor or its representatives, including but not limited to\r
+      communication on electronic mailing lists, source code control systems,\r
+      and issue tracking systems that are managed by, or on behalf of, the\r
+      Licensor for the purpose of discussing and improving the Work, but\r
+      excluding communication that is conspicuously marked or otherwise\r
+      designated in writing by the copyright owner as "Not a Contribution."\r
+\r
+      "Contributor" shall mean Licensor and any individual or Legal Entity\r
+      on behalf of whom a Contribution has been received by Licensor and\r
+      subsequently incorporated within the Work.\r
+\r
+   2. Grant of Copyright License. Subject to the terms and conditions of\r
+      this License, each Contributor hereby grants to You a perpetual,\r
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable\r
+      copyright license to reproduce, prepare Derivative Works of,\r
+      publicly display, publicly perform, sublicense, and distribute the\r
+      Work and such Derivative Works in Source or Object form.\r
+\r
+   3. Grant of Patent License. Subject to the terms and conditions of\r
+      this License, each Contributor hereby grants to You a perpetual,\r
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable\r
+      (except as stated in this section) patent license to make, have made,\r
+      use, offer to sell, sell, import, and otherwise transfer the Work,\r
+      where such license applies only to those patent claims licensable\r
+      by such Contributor that are necessarily infringed by their\r
+      Contribution(s) alone or by combination of their Contribution(s)\r
+      with the Work to which such Contribution(s) was submitted. If You\r
+      institute patent litigation against any entity (including a\r
+      cross-claim or counterclaim in a lawsuit) alleging that the Work\r
+      or a Contribution incorporated within the Work constitutes direct\r
+      or contributory patent infringement, then any patent licenses\r
+      granted to You under this License for that Work shall terminate\r
+      as of the date such litigation is filed.\r
+\r
+   4. Redistribution. You may reproduce and distribute copies of the\r
+      Work or Derivative Works thereof in any medium, with or without\r
+      modifications, and in Source or Object form, provided that You\r
+      meet the following conditions:\r
+\r
+      (a) You must give any other recipients of the Work or\r
+          Derivative Works a copy of this License; and\r
+\r
+      (b) You must cause any modified files to carry prominent notices\r
+          stating that You changed the files; and\r
+\r
+      (c) You must retain, in the Source form of any Derivative Works\r
+          that You distribute, all copyright, patent, trademark, and\r
+          attribution notices from the Source form of the Work,\r
+          excluding those notices that do not pertain to any part of\r
+          the Derivative Works; and\r
+\r
+      (d) If the Work includes a "NOTICE" text file as part of its\r
+          distribution, then any Derivative Works that You distribute must\r
+          include a readable copy of the attribution notices contained\r
+          within such NOTICE file, excluding those notices that do not\r
+          pertain to any part of the Derivative Works, in at least one\r
+          of the following places: within a NOTICE text file distributed\r
+          as part of the Derivative Works; within the Source form or\r
+          documentation, if provided along with the Derivative Works; or,\r
+          within a display generated by the Derivative Works, if and\r
+          wherever such third-party notices normally appear. The contents\r
+          of the NOTICE file are for informational purposes only and\r
+          do not modify the License. You may add Your own attribution\r
+          notices within Derivative Works that You distribute, alongside\r
+          or as an addendum to the NOTICE text from the Work, provided\r
+          that such additional attribution notices cannot be construed\r
+          as modifying the License.\r
+\r
+      You may add Your own copyright statement to Your modifications and\r
+      may provide additional or different license terms and conditions\r
+      for use, reproduction, or distribution of Your modifications, or\r
+      for any such Derivative Works as a whole, provided Your use,\r
+      reproduction, and distribution of the Work otherwise complies with\r
+      the conditions stated in this License.\r
+\r
+   5. Submission of Contributions. Unless You explicitly state otherwise,\r
+      any Contribution intentionally submitted for inclusion in the Work\r
+      by You to the Licensor shall be under the terms and conditions of\r
+      this License, without any additional terms or conditions.\r
+      Notwithstanding the above, nothing herein shall supersede or modify\r
+      the terms of any separate license agreement you may have executed\r
+      with Licensor regarding such Contributions.\r
+\r
+   6. Trademarks. This License does not grant permission to use the trade\r
+      names, trademarks, service marks, or product names of the Licensor,\r
+      except as required for reasonable and customary use in describing the\r
+      origin of the Work and reproducing the content of the NOTICE file.\r
+\r
+   7. Disclaimer of Warranty. Unless required by applicable law or\r
+      agreed to in writing, Licensor provides the Work (and each\r
+      Contributor provides its Contributions) on an "AS IS" BASIS,\r
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or\r
+      implied, including, without limitation, any warranties or conditions\r
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A\r
+      PARTICULAR PURPOSE. You are solely responsible for determining the\r
+      appropriateness of using or redistributing the Work and assume any\r
+      risks associated with Your exercise of permissions under this License.\r
+\r
+   8. Limitation of Liability. In no event and under no legal theory,\r
+      whether in tort (including negligence), contract, or otherwise,\r
+      unless required by applicable law (such as deliberate and grossly\r
+      negligent acts) or agreed to in writing, shall any Contributor be\r
+      liable to You for damages, including any direct, indirect, special,\r
+      incidental, or consequential damages of any character arising as a\r
+      result of this License or out of the use or inability to use the\r
+      Work (including but not limited to damages for loss of goodwill,\r
+      work stoppage, computer failure or malfunction, or any and all\r
+      other commercial damages or losses), even if such Contributor\r
+      has been advised of the possibility of such damages.\r
+\r
+   9. Accepting Warranty or Additional Liability. While redistributing\r
+      the Work or Derivative Works thereof, You may choose to offer,\r
+      and charge a fee for, acceptance of support, warranty, indemnity,\r
+      or other liability obligations and/or rights consistent with this\r
+      License. However, in accepting such obligations, You may act only\r
+      on Your own behalf and on Your sole responsibility, not on behalf\r
+      of any other Contributor, and only if You agree to indemnify,\r
+      defend, and hold each Contributor harmless for any liability\r
+      incurred by, or claims asserted against, such Contributor by reason\r
+      of your accepting any such warranty or additional liability.\r
+\r
+   END OF TERMS AND CONDITIONS\r
+\r
+   APPENDIX: How to apply the Apache License to your work.\r
+\r
+      To apply the Apache License to your work, attach the following\r
+      boilerplate notice, with the fields enclosed by brackets "[]"\r
+      replaced with your own identifying information. (Don't include\r
+      the brackets!)  The text should be enclosed in the appropriate\r
+      comment syntax for the file format. We also recommend that a\r
+      file or class name and description of purpose be included on the\r
+      same "printed page" as the copyright notice for easier\r
+      identification within third-party archives.\r
+\r
+   Copyright [yyyy] [name of copyright owner]\r
+\r
+   Licensed under the Apache License, Version 2.0 (the "License");\r
+   you may not use this file except in compliance with the License.\r
+   You may obtain a copy of the License at\r
+\r
+       http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+   Unless required by applicable law or agreed to in writing, software\r
+   distributed under the License is distributed on an "AS IS" BASIS,\r
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+   See the License for the specific language governing permissions and\r
+   limitations under the License.\r
+\r
diff --git a/NOTICE b/NOTICE
new file mode 100644 (file)
index 0000000..4c49449
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1 @@
+Copyright (c) 2000 - 2012 Samsung Electronics Co., Ltd. All rights reserved.
diff --git a/include/pims-ipc-data.h b/include/pims-ipc-data.h
new file mode 100644 (file)
index 0000000..e3bc131
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __PIMS_IPC_DATA_H__
+#define __PIMS_IPC_DATA_H__
+
+#include <pims-ipc-types.h>
+
+#ifdef _cplusplus
+extern "C"
+{
+#endif
+
+#define pims_ipc_data_create(flags) pims_ipc_data_create_with_size(1024, (flags))
+
+pims_ipc_data_h pims_ipc_data_create_with_size(unsigned int size, int flags);
+void pims_ipc_data_destroy(pims_ipc_data_h ipc);
+int pims_ipc_data_put(pims_ipc_data_h data, void *buf, unsigned int size);
+void* pims_ipc_data_get(pims_ipc_data_h data, unsigned int *size);
+void* pims_ipc_data_get_dup(pims_ipc_data_h data, unsigned int *size);
+int pims_ipc_data_put_with_type(pims_ipc_data_h data, pims_ipc_data_type_e type, void *buf, unsigned int size);
+void* pims_ipc_data_get_with_type(pims_ipc_data_h data, pims_ipc_data_type_e *type, unsigned int *size);
+void* pims_ipc_data_get_dup_with_type(pims_ipc_data_h data, pims_ipc_data_type_e *type, unsigned int *size);
+
+void* pims_ipc_data_marshal(pims_ipc_data_h data, unsigned int *size);
+int pims_ipc_data_marshal_with_zmq(pims_ipc_data_h data, zmq_msg_t *pzmsg);
+void* pims_ipc_data_marshal_dup(pims_ipc_data_h data, unsigned int *size);
+pims_ipc_data_h pims_ipc_data_unmarshal(void *buf, unsigned int size);
+pims_ipc_data_h pims_ipc_data_unmarshal_with_zmq(zmq_msg_t *pzmsg);
+pims_ipc_data_h pims_ipc_data_unmarshal_dup(void *buf, unsigned int size);
+
+#ifdef _cplusplus
+}
+#endif
+
+#endif /* __PIMS_IPC_DATA_H__ */
diff --git a/include/pims-ipc-svc.h b/include/pims-ipc-svc.h
new file mode 100644 (file)
index 0000000..0f85bb8
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __PIMS_IPC_SVC_H__
+#define __PIMS_IPC_SVC_H__
+
+#include <pims-ipc-types.h>
+#include <sys/stat.h>
+
+#ifdef _cplusplus
+extern "C"
+{
+#endif
+
+int pims_ipc_svc_init(char *service, gid_t group, mode_t mode);
+int pims_ipc_svc_deinit(void);
+int pims_ipc_svc_register(char *module, char *function, pims_ipc_svc_call_cb callback, void *userdata);
+
+int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode);
+int pims_ipc_svc_deinit_for_publish(void);
+int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data);
+
+void pims_ipc_svc_run_main_loop(GMainLoop* main_loop);
+
+#ifdef _cplusplus
+}
+#endif
+
+#endif /*__PIMS_IPC_IMPL_H__*/
+
diff --git a/include/pims-ipc-types.h b/include/pims-ipc-types.h
new file mode 100644 (file)
index 0000000..3244c34
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __PIMS_IPC_TYPES_H__
+#define __PIMS_IPC_TYPES_H__
+
+#include <zmq.h>
+#include <glib.h>
+#include <stdbool.h>
+
+
+#ifdef _cplusplus
+extern "C"
+{
+#endif
+
+#define PIMS_IPC_DATA_FLAGS_NONE        0x00000000
+#define PIMS_IPC_DATA_FLAGS_WITH_TYPE   0x00000001
+
+typedef void* pims_ipc_h;
+typedef void* pims_ipc_data_h;
+
+typedef enum {
+    PIMS_IPC_DATA_TYPE_INVALID,
+    PIMS_IPC_DATA_TYPE_CHAR,
+    PIMS_IPC_DATA_TYPE_UCHAR,
+    PIMS_IPC_DATA_TYPE_INT,
+    PIMS_IPC_DATA_TYPE_UINT,
+    PIMS_IPC_DATA_TYPE_LONG,
+    PIMS_IPC_DATA_TYPE_ULONG,
+    PIMS_IPC_DATA_TYPE_FLOAT,
+    PIMS_IPC_DATA_TYPE_DOUBLE,
+    PIMS_IPC_DATA_TYPE_STRING,
+    PIMS_IPC_DATA_TYPE_MEMORY,
+} pims_ipc_data_type_e;
+
+typedef void (*pims_ipc_svc_call_cb)(pims_ipc_h ipc, pims_ipc_data_h data_in, 
+                                    pims_ipc_data_h *data_out, void *userdata);
+typedef void (*pims_ipc_call_async_cb)(pims_ipc_h ipc, pims_ipc_data_h data_out, void *userdata);
+typedef void (*pims_ipc_subscribe_cb)(pims_ipc_h ipc, pims_ipc_data_h data, void *userdata);
+
+#ifdef _cplusplus
+}
+#endif
+
+#endif /* __PIMS_IPC_TYPES_H__ */
+
diff --git a/include/pims-ipc.h b/include/pims-ipc.h
new file mode 100644 (file)
index 0000000..f192ec9
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __PIMS_IPC_H__
+#define __PIMS_IPC_H__
+
+#include <pims-ipc-types.h>
+
+#ifdef _cplusplus
+extern "C"
+{
+#endif
+
+pims_ipc_h pims_ipc_create(char *service);
+void pims_ipc_destroy(pims_ipc_h ipc);
+int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
+                  pims_ipc_data_h *data_out);
+int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
+                        pims_ipc_call_async_cb callback, void *userdata);
+bool pims_ipc_is_call_in_progress(pims_ipc_h ipc);
+
+pims_ipc_h pims_ipc_create_for_subscribe(char *service);
+void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc);
+int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata);
+int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event);
+
+#ifdef _cplusplus
+}
+#endif
+
+#endif /*__PIMS_IPC_H__*/
+
diff --git a/packaging/pims-ipc.spec b/packaging/pims-ipc.spec
new file mode 100644 (file)
index 0000000..9cc428d
--- /dev/null
@@ -0,0 +1,59 @@
+Name:       pims-ipc
+Summary:    library for PIMs IPC
+Version:    0.0.20
+Release:    1
+Group:      System/Libraries
+License:    Apache 2.0
+Source0:    %{name}-%{version}.tar.gz
+Requires(post): /sbin/ldconfig
+Requires(post): /usr/bin/sqlite3
+Requires(postun): /sbin/ldconfig
+
+BuildRequires: cmake
+BuildRequires: pkgconfig(glib-2.0)
+BuildRequires: pkgconfig(dlog)
+BuildRequires: pkgconfig(libsystemd-daemon)
+BuildRequires: pkgconfig(libzmq)
+
+%description
+library for PIMs IPC
+
+%package devel
+Summary:    DB library for calendar
+Group:      Development/Libraries
+Requires:   %{name} = %{version}-%{release}
+
+%description devel
+library for PIMs IPC (developement files)
+
+%prep
+%setup -q
+
+
+%build
+cmake . -DCMAKE_INSTALL_PREFIX=%{_prefix}
+
+
+make %{?jobs:-j%jobs}
+
+%install
+%make_install
+
+
+%post
+/sbin/ldconfig
+
+%postun -p /sbin/ldconfig
+
+
+%files
+%manifest pims-ipc.manifest
+%defattr(-,root,root,-)
+%{_libdir}/libpims-ipc.so.*
+
+%files devel
+%defattr(-,root,root,-)
+%{_includedir}/pims-ipc/*.h
+%{_libdir}/*.so
+%{_libdir}/pims_ipc_test
+%{_libdir}/pkgconfig/pims-ipc.pc
diff --git a/pims-ipc.manifest b/pims-ipc.manifest
new file mode 100644 (file)
index 0000000..44e6873
--- /dev/null
@@ -0,0 +1,12 @@
+<manifest>
+       <define>
+               <domain name="pims-ipc"/>
+       </define>
+       <assign>
+               <filesystem path="/usr/lib/libpims-ipc.so.1.0.1" label="_" />
+               <filesystem path="/usr/lib/libpims-ipc.so.1" label="_" />
+       </assign>
+       <request>
+               <domain name="pims-ipc" />
+       </request>
+</manifest>
diff --git a/pims-ipc.pc.in b/pims-ipc.pc.in
new file mode 100755 (executable)
index 0000000..e277e4d
--- /dev/null
@@ -0,0 +1,13 @@
+# Package Information for pkg-config
+
+prefix=@PREFIX@
+exec_prefix=@EXEC_PREFIX@
+libdir=@LIBDIR@
+includedir=@INCLUDEDIR@
+
+Name: @PROJECT_NAME@
+Description: @PROJECT_NAME@ library
+Version: @VERSION@
+Requires: glib-2.0 libzmq
+Libs: -L${libdir} -l@PROJECT_NAME@
+Cflags: -I${includedir}
diff --git a/src/pims-debug.h b/src/pims-debug.h
new file mode 100644 (file)
index 0000000..0876864
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __PIMS_DEBUG_H__
+#define __PIMS_DEBUG_H__
+
+#ifndef _NON_SLP
+#include <dlog.h>
+#endif
+#include <assert.h>
+
+#ifdef _cplusplus
+extern "C"
+{
+#endif
+
+/* Tag defines */
+#define TAG_IPC     "PIMS_IPC"
+
+/* debug base macro */
+#ifndef _NON_SLP
+#define __ug_log(logtype, tag, frmt, args...) \
+        do {LOG(logtype, tag, "[%s:%d] "frmt"\n", __FUNCTION__, __LINE__, ##args);} while (0)
+#else
+#define LOG_VERBOSE "VERBOSE"
+#define LOG_DEBUG   "DEBUG"
+#define LOG_INFO    "INFO"
+#define LOG_WARN    "WARN"
+#define LOG_ERROR   "ERROR"
+#define __ug_log(logtype, tag, frmt, args...) \
+        do {printf("[%s][%s][%08x][%s:%d] "frmt"\n", logtype, tag, (unsigned int)pthread_self(), __FUNCTION__, __LINE__, ##args);} while (0)
+#endif
+
+#define pims_verbose(tag, frmt, args...) __ug_log(LOG_VERBOSE, tag, frmt, ##args)
+#define pims_debug(tag, frmt, args...)   __ug_log(LOG_DEBUG,   tag, frmt, ##args)
+#define pims_info(tag, frmt, args...)    __ug_log(LOG_INFO,    tag, frmt, ##args)
+#define pims_warn(tag, frmt, args...)    __ug_log(LOG_WARN,    tag, frmt, ##args)
+#define pims_error(tag, frmt, args...)   __ug_log(LOG_ERROR,   tag, frmt, ##args)
+
+#ifndef TAG_NAME // SET default TAG
+#define TAG_NAME    TAG_IPC
+#endif
+
+#define PIMS_VERBOSE_TAG(frmt, args...) pims_verbose(TAG_NAME, frmt, ##args);
+#define PIMS_DEBUG_TAG(frmt, args...)   pims_debug  (TAG_NAME, frmt, ##args);
+#define PIMS_INFO_TAG(frmt, args...)    pims_info   (TAG_NAME, frmt, ##args);
+#define PIMS_WARN_TAG(frmt, args...)    pims_warn   (TAG_NAME, frmt, ##args);
+#define PIMS_ERROR_TAG(frmt, args...)   pims_error  (TAG_NAME, frmt, ##args);
+
+#define VERBOSE(frmt, args...)  PIMS_VERBOSE_TAG(frmt, ##args)
+#define DEBUG(frmt, args...)    PIMS_DEBUG_TAG(frmt, ##args)
+#define INFO(frmt, args...)     PIMS_INFO_TAG(frmt, ##args)
+#define WARNING(frmt, args...)  PIMS_WARN_TAG(frmt, ##args)
+#define ERROR(frmt, args...)    PIMS_ERROR_TAG(frmt, ##args)
+
+#define ASSERT(expr) \
+    if (!(expr)) \
+    {   \
+        ERROR("Assertion %s", #expr); \
+    } \
+    assert(expr)
+
+#ifdef _cplusplus
+}
+#endif
+
+#endif /* __PIMS_DEBUG_H__ */
diff --git a/src/pims-internal.h b/src/pims-internal.h
new file mode 100644 (file)
index 0000000..98de999
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __PIMS_INTERNAL_H__
+#define __PIMS_INTERNAL_H__
+
+#include <zmq.h>
+
+#ifdef _cplusplus
+extern "C"
+{
+#endif
+
+#ifndef API
+#define API __attribute__ ((visibility("default")))
+#endif
+
+#define PIMS_IPC_MONITOR_PATH       "monitor"
+#define PIMS_IPC_DEALER_PATH        "dealer"
+#define PIMS_IPC_MANAGER_PATH       "manager"
+#define PIMS_IPC_MONITOR2_PATH      "monitor2"
+#define PIMS_IPC_MODULE_INTERNAL    "pims_ipc_internal"
+#define PIMS_IPC_FUNCTION_CREATE    "create"
+#define PIMS_IPC_FUNCTION_DESTROY   "destroy"
+#define PIMS_IPC_CALL_ID_CREATE     PIMS_IPC_MODULE_INTERNAL ":" PIMS_IPC_FUNCTION_CREATE
+#define PIMS_IPC_CALL_ID_DESTROY    PIMS_IPC_MODULE_INTERNAL ":" PIMS_IPC_FUNCTION_DESTROY
+#define PIMS_IPC_MAKE_CALL_ID(module, function) g_strdup_printf("%s:%s", module, function)
+
+static inline int _pims_zmq_msg_recv(zmq_msg_t *msg, void *socket, int flags)
+{
+    int ret = -1;
+
+    while (1)
+    {
+        ret = zmq_msg_recv(msg, socket, flags);
+        if (ret == -1 && errno == EINTR)
+            continue;
+        break;
+    }
+    return ret;
+}
+
+static inline int _pims_zmq_msg_send(zmq_msg_t *msg, void *socket, int flags)
+{
+    int ret = -1;
+
+    while (1)
+    {
+        ret = zmq_msg_send(msg, socket, flags);
+        if (ret == -1 && errno == EINTR)
+            continue;
+        break;
+    }
+    return ret;
+}
+
+#ifdef _cplusplus
+}
+#endif
+
+#endif /* __PIMS_INTERNAL_H__ */
diff --git a/src/pims-ipc-data.c b/src/pims-ipc-data.c
new file mode 100644 (file)
index 0000000..86d39be
--- /dev/null
@@ -0,0 +1,503 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <glib.h>
+
+#include <pims-internal.h>
+#include <pims-debug.h>
+#include <pims-ipc-data.h>
+
+typedef struct
+{
+    unsigned int alloc_size;
+    unsigned int buf_size;
+    unsigned int free_size;
+    zmq_msg_t zmsg;
+    char *pos;
+    char *buf;
+    int flags;
+    unsigned int created:1;
+    unsigned int zmsg_alloced:1;
+    unsigned int buf_alloced:1;
+} pims_ipc_data_t;
+
+/*
+  Structure of data with type(4 bytes order)
+      +------------------------------------------------------------------+
+      | type | size | data        | pad | type | size | data       | pad |
+      +------------------------------------------------------------------+
+         4      4     size         0-3          (Size of bytes)
+
+  Structure of data without type(4 bytes order)
+      +----------------------------------------------------+
+      | size | data        | pad | size | data       | pad |
+      +----------------------------------------------------+
+         4     size         0-3        (Size of bytes)
+*/
+
+#define _get_used_size_with_type(data_size) \
+    (sizeof(int) + sizeof(int) + data_size + (sizeof(int) - (data_size % sizeof(int))))
+
+#define _get_used_size(data_size) \
+    (sizeof(int) + data_size + (sizeof(int) - (data_size % sizeof(int))))
+
+API pims_ipc_data_h pims_ipc_data_create_with_size(unsigned int size, int flags)
+{
+    int ret = -1;
+    pims_ipc_data_t *handle = NULL;
+
+    handle = g_new0(pims_ipc_data_t, 1);
+    handle->alloc_size = size;
+    handle->free_size = size;
+    handle->buf_size = 0;
+    handle->buf = g_malloc0(size);
+    handle->pos = handle->buf;
+    handle->created = 1;
+    handle->buf_alloced = 1;
+
+    ret = pims_ipc_data_put(handle, &flags, sizeof(int));
+    ASSERT(ret == 0);
+    handle->flags = flags;
+
+    return handle;
+}
+
+API void pims_ipc_data_destroy(pims_ipc_data_h data)
+{
+    pims_ipc_data_t *handle = (pims_ipc_data_t *)data;
+    if (handle->buf_alloced)
+    {
+        g_free(handle->buf);
+    }
+    if (handle->zmsg_alloced)
+    {
+        zmq_msg_close(&handle->zmsg);
+    }
+    g_free(handle);
+}
+
+API int pims_ipc_data_put(pims_ipc_data_h data, void *buf, unsigned int size)
+{
+    pims_ipc_data_t *handle = NULL;
+    unsigned int dsize = size;
+    unsigned int used_size = 0;
+    handle = (pims_ipc_data_t *)data;
+
+    if (handle->created == 0)
+    {
+        ERROR("This handle isn't create mode.");
+        return -1;
+    }
+    if (handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE)
+    {
+        ERROR("Not without-type mode");
+        return -1;
+    }
+    if (dsize > 0 && buf == NULL)
+    {
+        ERROR("Invalid argument");
+        return -1;
+    }
+
+    if (handle->free_size < _get_used_size(dsize))
+    {
+        int new_size = 0;
+        new_size = handle->alloc_size * 2;
+
+        while (new_size < handle->buf_size + _get_used_size(dsize))
+            new_size *= 2;
+        handle->buf = g_realloc(handle->buf, new_size);
+        handle->alloc_size = new_size;
+        handle->free_size = handle->alloc_size - handle->buf_size;
+
+        handle->pos = handle->buf;
+        handle->pos += handle->buf_size;
+
+        VERBOSE("free_size [%d] dsize [%d]", handle->free_size, dsize);
+    }
+
+    *(int*)(handle->pos) = dsize;
+    if (dsize > 0)
+        memcpy((handle->pos+sizeof(int)), buf, dsize);
+
+    used_size = _get_used_size(dsize);
+    handle->pos += used_size;
+    handle->buf_size += used_size;
+    handle->free_size -= used_size;
+    
+    VERBOSE("data put size [%u] data[%p]", dsize, buf);
+
+    return 0;
+}
+
+API void* pims_ipc_data_get(pims_ipc_data_h data, unsigned int *size)
+{
+    pims_ipc_data_t *handle = NULL;
+    unsigned int dsize = 0;
+    unsigned int used_size = 0;
+    void *buf = NULL;
+    handle = (pims_ipc_data_t *)data;
+
+    if (handle->created == 1)
+    {
+        ERROR("This handle is create mode.");
+        return NULL;
+    }
+    if (handle->buf_size == 0)
+    {
+        ERROR("Remain buffer size is 0.");
+        return NULL;
+    }
+    if (handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE)
+    {
+        ERROR("Not without-type mode");
+        return NULL;
+    }
+
+    dsize = *(int*)(handle->pos);
+    buf = (handle->pos+sizeof(int));
+
+    if (dsize == 0) // current position is next size field becasue data size is 0
+        buf = NULL;
+
+    used_size = _get_used_size(dsize);
+    handle->pos += used_size;
+    handle->buf_size -= used_size;
+    handle->free_size += used_size;
+
+    VERBOSE("data get size [%u] data[%p]", dsize, buf);
+    *size = dsize;
+    return buf;
+}
+
+API void* pims_ipc_data_get_dup(pims_ipc_data_h data, unsigned int *size)
+{
+    void *buf = NULL;
+
+    buf = pims_ipc_data_get(data, size);
+    return g_memdup(buf, *size);
+}
+
+
+API int pims_ipc_data_put_with_type(pims_ipc_data_h data, pims_ipc_data_type_e type, void *buf, unsigned int size)
+{
+    pims_ipc_data_t *handle = NULL;
+    unsigned int dsize = 0;
+    unsigned int used_size = 0;
+    handle = (pims_ipc_data_t *)data;
+
+    if (handle->created == 0)
+    {
+        ERROR("This handle isn't create mode.");
+        return -1;
+    }
+    if (!(handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE))
+    {
+        ERROR("Not with-type mode");
+        return -1;
+    }
+
+    switch(type)
+    {
+    case PIMS_IPC_DATA_TYPE_CHAR:
+        dsize = sizeof(char);
+        break;
+    case PIMS_IPC_DATA_TYPE_UCHAR:
+        dsize = sizeof(unsigned char);
+        break;
+    case PIMS_IPC_DATA_TYPE_INT:
+        dsize = sizeof(int);
+        break;
+    case PIMS_IPC_DATA_TYPE_UINT:
+        dsize = sizeof(unsigned int);
+        break;
+    case PIMS_IPC_DATA_TYPE_LONG:
+        dsize = sizeof(long);
+        break;
+    case PIMS_IPC_DATA_TYPE_ULONG:
+        dsize = sizeof(unsigned long);
+        break;
+    case PIMS_IPC_DATA_TYPE_FLOAT:
+        dsize = sizeof(float);
+        break;
+    case PIMS_IPC_DATA_TYPE_DOUBLE:
+        dsize = sizeof(double);
+        break;
+    case PIMS_IPC_DATA_TYPE_STRING:
+        if (buf == NULL)
+        {
+            dsize = 0;
+        }
+        else
+        {
+            dsize = strlen(buf) +1;
+        }
+        break;
+    case PIMS_IPC_DATA_TYPE_MEMORY:
+        dsize = size;
+        break;
+    default:
+        dsize = 0;
+        break;
+    }
+
+    if (dsize != 0 && buf == NULL)
+        return -1;
+
+    if (buf == NULL && dsize == 0 && type != PIMS_IPC_DATA_TYPE_STRING)
+        return -1;
+
+    if (handle->free_size < _get_used_size_with_type(dsize))
+    {
+        int new_size = 0;
+        new_size = handle->alloc_size * 2;
+
+        while (new_size < handle->buf_size + _get_used_size_with_type(dsize))
+            new_size *= 2;
+        handle->buf = g_realloc(handle->buf, new_size);
+        handle->alloc_size = new_size;
+        handle->free_size = handle->alloc_size - handle->buf_size;
+
+        handle->pos = handle->buf;
+        handle->pos += handle->buf_size;
+
+        VERBOSE("free_size [%d] dsize [%d]", handle->free_size, dsize);
+    }
+
+    *(int*)(handle->pos) = type;
+    *(int*)(handle->pos+sizeof(int)) = dsize;
+    if (dsize > 0 && buf != NULL)
+        memcpy((handle->pos+sizeof(int)*2), buf, dsize);
+
+    used_size = _get_used_size_with_type(dsize);
+    handle->pos += used_size;
+    handle->buf_size += used_size;
+    handle->free_size -= used_size;
+    
+    VERBOSE("data put type [%d] size [%u] data[%p]", type, dsize, buf);
+
+    return 0;
+}
+
+API void* pims_ipc_data_get_with_type(pims_ipc_data_h data, pims_ipc_data_type_e *type, unsigned int *size)
+{
+    pims_ipc_data_t *handle = NULL;
+    pims_ipc_data_type_e dtype = PIMS_IPC_DATA_TYPE_INVALID;
+    unsigned int dsize = 0;
+    unsigned int used_size = 0;
+    void *buf = NULL;
+    handle = (pims_ipc_data_t *)data;
+
+    if (handle->created == 1)
+    {
+        ERROR("This handle is create mode.");
+        *type = PIMS_IPC_DATA_TYPE_INVALID;
+        return NULL;
+    }
+    if (handle->buf_size == 0)
+    {
+        ERROR("Remain buffer size is 0.");
+        *type = PIMS_IPC_DATA_TYPE_INVALID;
+        return NULL;
+    }
+    if (!(handle->flags & PIMS_IPC_DATA_FLAGS_WITH_TYPE))
+    {
+        ERROR("Not with-type mode");
+        *type = PIMS_IPC_DATA_TYPE_INVALID;
+        return NULL;
+    }
+
+    dtype = *(int*)(handle->pos);
+    dsize = *(int*)(handle->pos+sizeof(int));
+    buf = (handle->pos+sizeof(int)*2);
+
+    switch(dtype)
+    {
+        case PIMS_IPC_DATA_TYPE_CHAR:
+        case PIMS_IPC_DATA_TYPE_UCHAR:
+        case PIMS_IPC_DATA_TYPE_INT:
+        case PIMS_IPC_DATA_TYPE_UINT:
+        case PIMS_IPC_DATA_TYPE_LONG:
+        case PIMS_IPC_DATA_TYPE_ULONG:
+        case PIMS_IPC_DATA_TYPE_FLOAT:
+        case PIMS_IPC_DATA_TYPE_DOUBLE:
+        case PIMS_IPC_DATA_TYPE_STRING:
+        case PIMS_IPC_DATA_TYPE_MEMORY:
+            break;
+        default:
+            ERROR("Unknown data type");
+            dsize = 0;
+            break;
+    }
+
+    if (dtype != PIMS_IPC_DATA_TYPE_STRING && dsize == 0)
+    {
+        *type = PIMS_IPC_DATA_TYPE_INVALID;
+        return NULL;
+    }
+
+    if (dsize == 0) // current position is next type field becasue data size is 0
+        buf = NULL;
+
+    used_size = _get_used_size_with_type(dsize);
+    handle->pos += used_size;
+    handle->buf_size -= used_size;
+    handle->free_size += used_size;
+
+    VERBOSE("data get type [%d] size [%u] data[%p]", dtype, dsize, buf);
+    *type = dtype;
+    *size = dsize;
+    return buf;
+}
+
+API void* pims_ipc_data_get_dup_with_type(pims_ipc_data_h data, pims_ipc_data_type_e *type, unsigned int *size)
+{
+    void *buf = NULL;
+
+    buf = pims_ipc_data_get_with_type(data, type, size);
+    return g_memdup(buf, *size);
+}
+
+API void* pims_ipc_data_marshal(pims_ipc_data_h data, unsigned int *size)
+{
+    pims_ipc_data_t *handle = NULL;
+    
+    if (!data || !size )
+        return NULL;
+    
+    handle = (pims_ipc_data_t *)data;
+
+    *size = handle->buf_size;
+
+    return handle->buf;
+}
+
+static void __pims_ipc_data_free_cb(void *data, void *hint)
+{
+    if (hint)
+        g_free(hint);
+}
+
+API int pims_ipc_data_marshal_with_zmq(pims_ipc_data_h data, zmq_msg_t *pzmsg)
+{
+    pims_ipc_data_t *handle = NULL;
+   
+    if (!data || !pzmsg )
+        return -1;
+
+    handle = (pims_ipc_data_t *)data;
+
+    if (handle->zmsg_alloced)
+    {
+        // TODO: need to prevent memory leackage when reusing data marshaled
+        WARNING("It's ever been marshaled");
+    }
+
+    zmq_msg_init_data(&handle->zmsg, handle->buf, handle->buf_size, __pims_ipc_data_free_cb, handle->buf); 
+    
+    if (zmq_msg_copy(pzmsg, &handle->zmsg) != 0)
+    {
+        zmq_msg_close(&handle->zmsg);
+        return -1;
+    }
+
+    handle->zmsg_alloced = 1;
+    handle->buf_alloced = 0;
+
+    return 0;
+}
+
+API void* pims_ipc_data_marshal_dup(pims_ipc_data_h data, unsigned int *size)
+{
+    unsigned int lsize = 0;
+    gpointer buf = NULL;
+    
+    if (!data || !size )
+        return NULL;
+    
+    buf = pims_ipc_data_marshal(data, &lsize);
+    *size = lsize;
+    return g_memdup(buf, lsize);
+}
+
+API pims_ipc_data_h pims_ipc_data_unmarshal(void *buf, unsigned int size)
+{
+    pims_ipc_data_t *handle = NULL;
+    zmq_msg_t zmsg; 
+
+    zmq_msg_init_data(&zmsg, buf, size, __pims_ipc_data_free_cb, NULL); 
+
+    handle = pims_ipc_data_unmarshal_with_zmq(&zmsg);
+    zmq_msg_close(&zmsg);
+
+    return handle;
+}
+
+API pims_ipc_data_h pims_ipc_data_unmarshal_with_zmq(zmq_msg_t *pzmsg)
+{
+    pims_ipc_data_t *handle = NULL;
+    unsigned int size = 0;
+    void *ptr = NULL;
+
+    handle = g_new0(pims_ipc_data_t, 1);
+    zmq_msg_init(&handle->zmsg);
+    if (zmq_msg_copy(&handle->zmsg, pzmsg) != 0)
+    {
+        g_free(handle);
+        return NULL;
+    }
+    handle->alloc_size = zmq_msg_size(&handle->zmsg);
+    handle->free_size = 0;
+    handle->buf_size = handle->alloc_size;
+    handle->buf = zmq_msg_data(&handle->zmsg);
+    handle->pos = handle->buf;
+    handle->created = 0;
+    handle->zmsg_alloced = 1;
+    
+    ptr = pims_ipc_data_get(handle, &size);
+    if (!ptr || size != sizeof(int))
+    {
+        g_free(handle);
+        return NULL;
+    }
+    handle->flags = *((int*)ptr);
+
+    VERBOSE("handle[%p] zmsg[%p] flags[%x]", handle, pzmsg, handle->flags);
+    
+    return handle;
+}
+
+API pims_ipc_data_h pims_ipc_data_unmarshal_dup(void *buf, unsigned int size)
+{
+    pims_ipc_data_t *handle = NULL;
+    zmq_msg_t zmsg; 
+
+    zmq_msg_init_size(&zmsg, size);
+    memcpy(zmq_msg_data(&zmsg), buf, size);
+
+    handle = pims_ipc_data_unmarshal_with_zmq(&zmsg);
+    zmq_msg_close(&zmsg);
+
+    return handle;
+}
diff --git a/src/pims-ipc-svc.c b/src/pims-ipc-svc.c
new file mode 100644 (file)
index 0000000..86a05c1
--- /dev/null
@@ -0,0 +1,1242 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <glib.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include <pims-internal.h>
+#include <pims-debug.h>
+#include <pims-socket.h>
+#include <pims-ipc-data.h>
+#include <pims-ipc-svc.h>
+
+#define PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT  2
+
+typedef struct
+{
+    char *service;
+    gid_t group;
+    mode_t mode;
+    GHashTable *cb_table;
+    GHashTable *client_table;
+    GList *workers;
+    GList *requests;
+    int workers_max_count;
+    void* context;
+    void* router;
+    void* worker;
+    void* manager;
+    void* monitor;
+} pims_ipc_svc_t;
+
+typedef struct
+{
+    char *service;
+    gid_t group;
+    mode_t mode;
+    void* context;
+    void* publisher;
+} pims_ipc_svc_for_publish_t;
+
+
+typedef struct
+{
+    pims_ipc_svc_call_cb callback;
+    void * user_data;
+} pims_ipc_svc_cb_t;
+
+static pims_ipc_svc_t *_g_singleton = NULL;
+static pims_ipc_svc_for_publish_t *_g_singleton_for_publish = NULL;
+
+API int pims_ipc_svc_init(char *service, gid_t group, mode_t mode)
+{
+    if (_g_singleton)
+    {
+        ERROR("Already exist");
+        return -1;
+    }
+
+    _g_singleton = g_new0(pims_ipc_svc_t, 1);
+    _g_singleton->service = g_strdup(service);
+    _g_singleton->group = group;
+    _g_singleton->mode = mode;
+    _g_singleton->workers_max_count = PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT;
+    _g_singleton->cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
+    ASSERT(_g_singleton->cb_table);
+    _g_singleton->client_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);
+    ASSERT(_g_singleton->client_table);
+
+    return 0;
+}
+
+static void __free_zmq_msg(gpointer data)
+{
+    zmq_msg_t *lpzmsg = data;
+
+    if (lpzmsg)
+    {
+        zmq_msg_close(lpzmsg);
+        g_free(lpzmsg);
+    }
+}
+
+API int pims_ipc_svc_deinit(void)
+{
+    if (!_g_singleton)
+        return -1;
+
+    g_free(_g_singleton->service);
+    g_hash_table_destroy(_g_singleton->cb_table);
+    g_hash_table_destroy(_g_singleton->client_table);
+    g_list_free(_g_singleton->workers);
+    g_list_free_full(_g_singleton->requests, __free_zmq_msg);
+    g_free(_g_singleton);
+    _g_singleton = NULL;
+
+    return 0;
+}
+
+API int pims_ipc_svc_register(char *module, char *function, pims_ipc_svc_call_cb callback, void *userdata)
+{
+    pims_ipc_svc_cb_t *cb_data = NULL;
+    gchar *call_id = NULL;
+
+    if (!module || !function || !callback)
+    {
+        ERROR("Invalid argument");
+        return -1;
+    }
+    cb_data = g_new0(pims_ipc_svc_cb_t, 1);
+    call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
+
+    VERBOSE("register cb id[%s]", call_id);
+    cb_data->callback = callback;
+    cb_data->user_data = userdata;
+    g_hash_table_insert(_g_singleton->cb_table, call_id, cb_data);
+
+    return 0;
+}
+
+API int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode)
+{
+    if (_g_singleton_for_publish)
+    {
+        ERROR("Already exist");
+        return -1;
+    }
+
+    _g_singleton_for_publish = g_new0(pims_ipc_svc_for_publish_t, 1);
+    _g_singleton_for_publish->service = g_strdup(service);
+    _g_singleton_for_publish->group = group;
+    _g_singleton_for_publish->mode = mode;
+
+    return 0;
+}
+
+API int pims_ipc_svc_deinit_for_publish(void)
+{
+    if (!_g_singleton_for_publish)
+        return -1;
+
+    g_free(_g_singleton_for_publish->service);
+    g_free(_g_singleton_for_publish);
+    _g_singleton_for_publish = NULL;
+
+    return 0;
+}
+
+static void __pims_ipc_svc_data_free_cb(void *data, void *hint)
+{
+    if (hint)
+        g_free(hint);
+}
+
+API int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data)
+{
+    pims_ipc_svc_for_publish_t *ipc_svc = _g_singleton_for_publish;
+    gboolean is_valid = FALSE;
+    gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
+
+    // init messages
+    zmq_msg_t call_id_msg;
+    zmq_msg_t data_msg;
+
+    zmq_msg_init_data(&call_id_msg, call_id, strlen(call_id) + 1, __pims_ipc_svc_data_free_cb, call_id);
+    VERBOSE("call id = %s", (char*)zmq_msg_data(&call_id_msg));
+
+    zmq_msg_init(&data_msg);
+
+    do {
+        if (data == NULL)
+        {
+            // send call id
+            if (_pims_zmq_msg_send(&call_id_msg, ipc_svc->publisher, 0) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+        }
+        else
+        {
+            // send call id
+            if (_pims_zmq_msg_send(&call_id_msg, ipc_svc->publisher, ZMQ_SNDMORE) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+
+            // marshal data
+            if (pims_ipc_data_marshal_with_zmq(data, &data_msg) != 0)
+            {
+                ERROR("marshal error");
+                break;
+            }
+
+            VERBOSE("the size of sending data = %d", zmq_msg_size(&data_msg));
+
+            // send data
+            if (_pims_zmq_msg_send(&data_msg, ipc_svc->publisher, 0) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+        }
+
+        is_valid = TRUE;
+    } while (0);
+
+    zmq_msg_close(&call_id_msg);
+    zmq_msg_close(&data_msg);
+
+    if (is_valid == FALSE)
+        return -1;
+    return 0;
+}
+
+static void __run_callback(int worker_id, char *call_id, pims_ipc_data_h dhandle_in, pims_ipc_data_h *dhandle_out)
+{
+    pims_ipc_svc_cb_t *cb_data = NULL;
+
+    VERBOSE("Call id [%s]", call_id);
+
+    cb_data = (pims_ipc_svc_cb_t*)g_hash_table_lookup(_g_singleton->cb_table, call_id);
+    if (cb_data == NULL)
+    {
+        VERBOSE("unable to find %s", call_id);
+        return;
+    }
+    
+    cb_data->callback((pims_ipc_h)worker_id, dhandle_in, dhandle_out, cb_data->user_data);
+}
+
+static int __process_worker_task(int worker_id, void *context, void *worker)
+{
+    gboolean is_create = FALSE;
+    gboolean is_destroy = FALSE;
+    char *pid = NULL;
+    char *call_id = NULL;
+    int64_t more = 0;
+    size_t more_size = sizeof(more);
+    pims_ipc_data_h dhandle_in = NULL;
+    pims_ipc_data_h dhandle_out = NULL;
+
+    VERBOSE("");
+
+#ifdef _TEST
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+    printf("worker time[%lu:%lu]\n", tv.tv_sec, tv.tv_usec);
+#endif
+
+    zmq_msg_t pid_msg;
+    zmq_msg_t sequence_no_msg;
+    zmq_msg_t call_id_msg;
+    zmq_msg_t data_msg;
+    
+    zmq_msg_init(&pid_msg);
+    zmq_msg_init(&sequence_no_msg);
+    zmq_msg_init(&call_id_msg);
+    zmq_msg_init(&data_msg);
+
+    do {
+        // read pid
+        if (_pims_zmq_msg_recv(&pid_msg, worker, 0) == -1)
+        {
+            ERROR("recv error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        // read sequence no
+        if (_pims_zmq_msg_recv(&sequence_no_msg, worker, 0) == -1)
+        {
+            ERROR("recv error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        // read call id
+        if (_pims_zmq_msg_recv(&call_id_msg, worker, 0) == -1)
+        {
+            ERROR("recv error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        more = 0;
+        zmq_getsockopt(worker, ZMQ_RCVMORE, &more, &more_size);
+        if (more)
+        {
+            // read data
+            if (_pims_zmq_msg_recv(&data_msg, worker, 0) == -1)
+            {
+                ERROR("recv error : %s", zmq_strerror(errno));
+                break;
+            }
+
+            dhandle_in = pims_ipc_data_unmarshal_with_zmq(&data_msg);
+            if (dhandle_in == NULL)
+            {
+                ERROR("unmarshal error");
+                break;
+            }
+        }
+
+        pid = (char*)zmq_msg_data(&pid_msg);
+        ASSERT(pid);
+        VERBOSE("client pid = %s", pid);
+
+        call_id = (char*)zmq_msg_data(&call_id_msg);
+        ASSERT(call_id);
+        VERBOSE("call_id = [%s]", call_id);
+
+        // call a callback function with call id and data
+        if (strcmp(PIMS_IPC_CALL_ID_CREATE, call_id) == 0)
+        {
+            is_create = TRUE;
+        }
+        else if (strcmp(PIMS_IPC_CALL_ID_DESTROY, call_id) == 0)
+        {
+            is_destroy = TRUE;
+        }
+        else
+        {
+            __run_callback(worker_id, call_id, dhandle_in, &dhandle_out);
+        }
+
+        // send pid
+        if (_pims_zmq_msg_send(&pid_msg, worker, ZMQ_SNDMORE) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        // send empty 
+        zmq_msg_t empty_msg;
+        zmq_msg_init_size(&empty_msg, 0);
+        if (_pims_zmq_msg_send(&empty_msg, worker, ZMQ_SNDMORE) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+            zmq_msg_close(&empty_msg);
+            break;
+        }
+        zmq_msg_close(&empty_msg);
+
+        // send sequence no
+        if (_pims_zmq_msg_send(&sequence_no_msg, worker, ZMQ_SNDMORE) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        if (dhandle_out)
+        {
+            // send call id
+            if (_pims_zmq_msg_send(&call_id_msg, worker, ZMQ_SNDMORE) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+
+            // marshal data
+            zmq_msg_close(&data_msg);
+            zmq_msg_init(&data_msg);
+            if (pims_ipc_data_marshal_with_zmq(dhandle_out, &data_msg) != 0)
+            {
+                ERROR("marshal error");
+                break;
+            }
+
+            // send data
+            VERBOSE("the size of sending data = %d", zmq_msg_size(&data_msg));
+            if (_pims_zmq_msg_send(&data_msg, worker, 0) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+        }
+        else
+        {
+            // send call id
+            if (_pims_zmq_msg_send(&call_id_msg, worker, 0) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+        }
+    } while (0);
+
+    zmq_msg_close(&pid_msg);
+    zmq_msg_close(&sequence_no_msg);
+    zmq_msg_close(&call_id_msg);
+    zmq_msg_close(&data_msg);
+
+    if (dhandle_in)
+        pims_ipc_data_destroy(dhandle_in);
+    if (dhandle_out)
+        pims_ipc_data_destroy(dhandle_out);
+
+    VERBOSE("responsed");
+
+#ifdef _TEST
+    gettimeofday(&tv, NULL);
+    printf("worker time[%lu:%lu]\n", tv.tv_sec, tv.tv_usec);
+#endif
+
+    if (is_destroy)
+        return -1;
+    return 0;
+}
+
+static int __process_manager_task(int worker_id, void *context, void *manager)
+{
+    VERBOSE("");
+
+    // read pid
+    zmq_msg_t pid_msg;
+    zmq_msg_init(&pid_msg);
+    if (_pims_zmq_msg_recv(&pid_msg, manager, 0) == -1)
+    {
+        ERROR("recv error : %s", zmq_strerror(errno));
+        zmq_msg_close(&pid_msg);
+        return -1;
+    }
+    zmq_msg_close(&pid_msg);
+
+    return -1;
+}
+
+static void* __worker_loop(void *args)
+{
+    void *context = args;
+    int worker_id = (int)pthread_self();
+    char *path = NULL;
+
+    void *worker = zmq_socket(context, ZMQ_DEALER);
+    if (!worker)
+    {
+        ERROR("socket error : %s", zmq_strerror(errno));
+        return NULL;
+    }
+    if (zmq_setsockopt(worker, ZMQ_IDENTITY, &worker_id, sizeof(int)) != 0)
+    {
+        ERROR("setsockopt error : %s", zmq_strerror(errno));
+        zmq_close(worker);
+        return NULL;
+    }
+    path = g_strdup_printf("inproc://%s-%s", _g_singleton->service, PIMS_IPC_DEALER_PATH);
+    if (zmq_connect(worker, path) != 0)
+    {
+        ERROR("connect error : %s", zmq_strerror(errno));
+        g_free(path);
+        zmq_close(worker);
+        return NULL;
+    }
+    g_free(path);
+
+    // send the ID of a worker to the manager
+    void *manager = zmq_socket(context, ZMQ_DEALER);
+    if (!manager)
+    {
+        ERROR("socket error : %s", zmq_strerror(errno));
+        zmq_close(worker);
+        return NULL;
+    }
+    if (zmq_setsockopt(manager, ZMQ_IDENTITY, &worker_id, sizeof(int)) != 0)
+    {
+        ERROR("setsockopt error : %s", zmq_strerror(errno));
+        zmq_close(manager);
+        zmq_close(worker);
+        return NULL;
+    }
+    path = g_strdup_printf("inproc://%s-%s", _g_singleton->service, PIMS_IPC_MANAGER_PATH);
+    if (zmq_connect(manager, path) != 0)
+    {
+        ERROR("connect error : %s", zmq_strerror(errno));
+        g_free(path);
+        zmq_close(manager);
+        zmq_close(worker);
+        return NULL;
+    }
+    g_free(path);
+
+    VERBOSE("starting worker id: %x", worker_id);
+    zmq_msg_t message;
+    zmq_msg_init_size(&message, sizeof(int));
+    memcpy(zmq_msg_data(&message), &worker_id, sizeof(int));
+    if (_pims_zmq_msg_send(&message, manager, 0) == -1)
+    {
+        ERROR("send error : %s", zmq_strerror(errno));
+        zmq_msg_close(&message);
+        zmq_close(manager);
+        zmq_close(worker);
+        return NULL;
+    }
+    zmq_msg_close(&message);
+
+    // poll all sockets
+    while (1)
+    {
+        zmq_pollitem_t items[] = {
+            {worker, 0, ZMQ_POLLIN, 0},
+            {manager, 0, ZMQ_POLLIN, 0}
+        };
+
+        if (zmq_poll(items, 2, -1) == -1)
+        {
+            ERROR("poll error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        if (items[0].revents & ZMQ_POLLIN)
+        {
+            if (__process_worker_task(worker_id, context, worker) != 0)
+                break;
+        }
+
+        if (items[1].revents & ZMQ_POLLIN)
+        {
+            if (__process_manager_task(worker_id, context, manager) != 0)
+                break;
+        }
+    }
+
+    VERBOSE("terminating worker id: %x", worker_id);
+
+    zmq_close(manager);
+    zmq_close(worker);
+    return NULL;
+}
+
+static void __launch_worker(void *(*start_routine) (void *), void *context)
+{
+    pthread_t worker;
+    pthread_attr_t attr;
+
+    // set kernel thread
+    pthread_attr_init(&attr);
+    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
+
+    pthread_create(&worker, &attr, start_routine, context);
+    // detach this thread
+    pthread_detach(worker);
+}
+
+static gboolean __is_worker_available()
+{
+    if (_g_singleton->workers)
+        return TRUE;
+    else
+        return FALSE;
+}
+
+static int __get_worker(const char *pid, int *worker_id)
+{
+    ASSERT(pid);
+    ASSERT(worker_id);
+
+    if (!__is_worker_available())
+    {
+        ERROR("There is no idle worker");
+        return -1;
+    }
+    *worker_id = (int)(g_list_first(_g_singleton->workers)->data);
+    _g_singleton->workers = g_list_delete_link(_g_singleton->workers,
+            g_list_first(_g_singleton->workers));
+
+    g_hash_table_insert(_g_singleton->client_table, g_strdup(pid), GINT_TO_POINTER(*worker_id));
+
+    return 0;
+}
+
+static int __find_worker(const char *pid, int *worker_id)
+{
+    char *orig_pid = NULL;
+
+    ASSERT(pid);
+    ASSERT(worker_id);
+    
+    if (g_hash_table_lookup_extended(_g_singleton->client_table, pid,
+                (gpointer*)&orig_pid, (gpointer*)worker_id) == TRUE)
+    {
+        VERBOSE("found worker id for %s = %x", pid, *worker_id);
+        return 0;
+    }
+    else
+    {
+        VERBOSE("unable to find worker id for %s", pid);
+        return -1;
+    }
+}
+
+static void __remove_worker(const char *pid)
+{
+    g_hash_table_remove(_g_singleton->client_table, pid);
+}
+
+static void __terminate_worker(void *manager, int worker_id, const char *pid)
+{
+    // send worker id
+    zmq_msg_t worker_id_msg;
+    zmq_msg_init_size(&worker_id_msg, sizeof(int));
+    memcpy(zmq_msg_data(&worker_id_msg), &worker_id, sizeof(int));
+    if (_pims_zmq_msg_send(&worker_id_msg, manager, ZMQ_SNDMORE) == -1)
+    {
+        ERROR("send error : %s", zmq_strerror(errno));
+        zmq_msg_close(&worker_id_msg);
+        return;
+    }
+    zmq_msg_close(&worker_id_msg);
+
+    // send pid
+    zmq_msg_t pid_msg;
+    zmq_msg_init_data(&pid_msg, (char*)pid, strlen(pid) + 1, NULL, NULL);
+    if (_pims_zmq_msg_send(&pid_msg, manager, 0) == -1)
+    {
+        ERROR("send error : %s", zmq_strerror(errno));
+        zmq_msg_close(&pid_msg);
+        return;
+    }
+    zmq_msg_close(&pid_msg);
+}
+
+static gboolean __enqueue_zmq_msg(zmq_msg_t *zmsg)
+{
+    zmq_msg_t *lpzmsg = NULL;
+    
+    if (zmsg)
+    {
+        lpzmsg = g_new0(zmq_msg_t, 1);
+        zmq_msg_init(lpzmsg);
+        zmq_msg_copy(lpzmsg, zmsg);
+    }
+    _g_singleton->requests = g_list_append(_g_singleton->requests, lpzmsg);
+
+    return TRUE;
+}
+
+static gboolean __dequeue_zmq_msg(zmq_msg_t *zmsg)
+{
+    zmq_msg_t *lpzmsg = NULL;
+
+    ASSERT(_g_singleton->requests);
+    lpzmsg = (zmq_msg_t*)(g_list_first(_g_singleton->requests)->data);
+    _g_singleton->requests = g_list_delete_link(_g_singleton->requests,
+            g_list_first(_g_singleton->requests));
+
+    if (lpzmsg == NULL)
+        return FALSE;
+
+    zmq_msg_copy(zmsg, lpzmsg);
+    zmq_msg_close(lpzmsg);
+    g_free(lpzmsg);
+
+    return TRUE;
+}
+
+static int __process_router_event(void *context, void *router, void *worker, gboolean for_queue)
+{
+    char *pid = NULL;
+    char *call_id = NULL;
+    int64_t more = 0;
+    size_t more_size = sizeof(more);
+    int worker_id = -1;
+    gboolean is_with_data = FALSE;
+    gboolean is_valid = FALSE;
+
+#ifdef _TEST
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+    printf("router time[%lu:%lu]\n", tv.tv_sec, tv.tv_usec);
+#endif
+
+    // init messages for receiving
+    zmq_msg_t pid_msg;
+    zmq_msg_t sequence_no_msg;
+    zmq_msg_t call_id_msg;
+    zmq_msg_t data_msg;
+    
+    zmq_msg_init(&pid_msg);
+    zmq_msg_init(&sequence_no_msg);
+    zmq_msg_init(&call_id_msg);
+    zmq_msg_init(&data_msg);
+
+    // relay a request from a client to a worker
+    do {
+        if (for_queue)
+        {
+            // dequeue a request
+            __dequeue_zmq_msg(&pid_msg);
+            __dequeue_zmq_msg(&sequence_no_msg);
+            __dequeue_zmq_msg(&call_id_msg);
+            is_with_data = __dequeue_zmq_msg(&data_msg);
+
+            if (is_with_data)
+                __dequeue_zmq_msg(NULL);
+        }
+        else
+        {
+            // read pid
+            if (_pims_zmq_msg_recv(&pid_msg, router, 0) == -1)
+            {
+                ERROR("recv error : %s", zmq_strerror(errno));
+                break;
+            }
+
+            // read empty and kill
+            zmq_msg_t empty_msg;
+            zmq_msg_init(&empty_msg);
+            if (_pims_zmq_msg_recv(&empty_msg, router, 0) == -1)
+            {
+                ERROR("recv error : %s", zmq_strerror(errno));
+                zmq_msg_close(&empty_msg);
+                break;
+            }
+            zmq_msg_close(&empty_msg);
+
+            // read sequence no
+            more = 0;
+            zmq_getsockopt(router, ZMQ_RCVMORE, &more, &more_size);
+            if (!more)
+            {
+                ERROR("recv error : corrupted message");
+                break;
+            }
+            if (_pims_zmq_msg_recv(&sequence_no_msg, router, 0) == -1)
+            {
+                ERROR("recv error : %s", zmq_strerror(errno));
+                break;
+            }
+
+            // read call id
+            more = 0;
+            zmq_getsockopt(router, ZMQ_RCVMORE, &more, &more_size);
+            if (!more)
+            {
+                ERROR("recv error : corrupted message");
+                break;
+            }
+            if (_pims_zmq_msg_recv(&call_id_msg, router, 0) == -1)
+            {
+                ERROR("recv error : %s", zmq_strerror(errno));
+                break;
+            }
+
+            // read data
+            more = 0;
+            zmq_getsockopt(router, ZMQ_RCVMORE, &more, &more_size);
+            if (more)
+            {
+                is_with_data = TRUE;
+                if (_pims_zmq_msg_recv(&data_msg, router, 0) == -1)
+                {
+                    ERROR("recv error : %s", zmq_strerror(errno));
+                    break;
+                }
+            }
+        }
+
+        pid = zmq_msg_data(&pid_msg);
+        ASSERT(pid != NULL);
+        VERBOSE("client pid = %s", pid);
+
+        call_id = (char*)zmq_msg_data(&call_id_msg);
+        ASSERT(call_id != NULL);
+        VERBOSE("call_id = [%s], create_call_id = [%s]", PIMS_IPC_CALL_ID_CREATE, call_id);
+        if (strcmp(PIMS_IPC_CALL_ID_CREATE, call_id) == 0)
+        {
+            // Get a worker. If cannot get a worker, create a worker and enqueue a current request 
+            __launch_worker(__worker_loop, context);
+            if (__get_worker((const char*)pid, &worker_id) != 0)
+            {
+                // enqueue a request until a new worker will be registered
+                __enqueue_zmq_msg(&pid_msg);
+                __enqueue_zmq_msg(&sequence_no_msg);
+                __enqueue_zmq_msg(&call_id_msg);
+                if (is_with_data)
+                    __enqueue_zmq_msg(&data_msg);
+                __enqueue_zmq_msg(NULL);
+
+                is_valid = TRUE;
+                break;
+            }
+        }
+        else
+        {
+            // Find a worker
+            if (__find_worker((const char*)pid, &worker_id) != 0)
+            {
+                ERROR("unable to find a worker");
+                break;
+            }
+
+            if (strcmp(PIMS_IPC_CALL_ID_DESTROY, call_id) == 0)
+            {
+                __remove_worker((const char*)pid);
+            }
+        }
+
+        VERBOSE("routing worker id = %x", worker_id);
+        // send worker id
+        zmq_msg_t worker_id_msg;
+        zmq_msg_init_size(&worker_id_msg, sizeof(int));
+        memcpy(zmq_msg_data(&worker_id_msg), &worker_id, sizeof(int));
+        if (_pims_zmq_msg_send(&worker_id_msg, worker, ZMQ_SNDMORE) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+            zmq_msg_close(&worker_id_msg);
+            break;
+        }
+        zmq_msg_close(&worker_id_msg);
+
+        // send pid
+        if (_pims_zmq_msg_send(&pid_msg, worker, ZMQ_SNDMORE) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        // send sequence no
+        if (_pims_zmq_msg_send(&sequence_no_msg, worker, ZMQ_SNDMORE) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        // send call id
+        if (_pims_zmq_msg_send(&call_id_msg, worker, is_with_data?ZMQ_SNDMORE:0) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        // send data
+        if (is_with_data)
+        {
+            if (_pims_zmq_msg_send(&data_msg, worker, 0) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+        }
+
+        is_valid = TRUE;
+    } while (0);
+
+    zmq_msg_close(&pid_msg);
+    zmq_msg_close(&sequence_no_msg);
+    zmq_msg_close(&call_id_msg);
+    zmq_msg_close(&data_msg);
+
+#ifdef _TEST
+    gettimeofday(&tv, NULL);
+    printf("router time[%lu:%lu]\n", tv.tv_sec, tv.tv_usec);
+#endif
+
+    if (is_valid == FALSE)
+        return -1;
+    
+    return 0;
+}
+
+static int __process_worker_event(void *context, void *worker, void *router)
+{
+    zmq_msg_t message;
+    int64_t more = 0;
+    size_t more_size = sizeof(more);
+
+    // Remove worker_id
+    zmq_msg_init(&message);
+    if (_pims_zmq_msg_recv(&message, worker, 0) == -1)
+    {
+        ERROR("recv error : %s", zmq_strerror(errno));
+    }
+    zmq_msg_close(&message);
+
+    while (1)
+    {
+        //  Process all parts of the message
+        zmq_msg_init(&message);
+        if (_pims_zmq_msg_recv(&message, worker, 0) == -1)
+        {
+            ERROR("recv error : %s", zmq_strerror(errno));
+        }
+        more = 0;
+        zmq_getsockopt(worker, ZMQ_RCVMORE, &more, &more_size);
+        VERBOSE("router received a message : more[%u]", (unsigned int)more);
+        if (_pims_zmq_msg_send(&message, router, more?ZMQ_SNDMORE:0) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+        }
+        zmq_msg_close(&message);
+        if (!more)
+            break;      //  Last message part
+    }
+
+    return 0;
+}
+
+static int __process_manager_event(void *context, void *manager)
+{
+    zmq_msg_t worker_id_msg;
+    int worker_id = -1;
+    
+    zmq_msg_init(&worker_id_msg);
+    if (_pims_zmq_msg_recv(&worker_id_msg, manager, 0) == -1)
+    {
+        ERROR("recv error : %s", zmq_strerror(errno));
+        zmq_msg_close(&worker_id_msg);
+        return -1;
+    }
+    zmq_msg_close(&worker_id_msg);
+
+    zmq_msg_init(&worker_id_msg);
+    if (_pims_zmq_msg_recv(&worker_id_msg, manager, 0) == -1)
+    {
+        ERROR("recv error : %s", zmq_strerror(errno));
+        zmq_msg_close(&worker_id_msg);
+        return -1;
+    }
+    memcpy(&worker_id, zmq_msg_data(&worker_id_msg), sizeof(int));
+    zmq_msg_close(&worker_id_msg);
+
+    VERBOSE("registered worker id = %x", worker_id);
+    _g_singleton->workers = g_list_append(_g_singleton->workers, GINT_TO_POINTER(worker_id));
+
+    return 0;
+}
+
+static int __process_monitor_event(void *context, void *monitor, void *manager)
+{
+    int worker_id = -1;
+    char *pid = NULL;
+    zmq_msg_t pid_msg;
+
+    VERBOSE("");
+    
+    // read pid
+    zmq_msg_init(&pid_msg);
+    if (_pims_zmq_msg_recv(&pid_msg, monitor, 0) == -1)
+    {
+        ERROR("recv error : %s", zmq_strerror(errno));
+        zmq_msg_close(&pid_msg);
+        return -1;
+    }
+
+    pid = (char*)zmq_msg_data(&pid_msg);
+    ASSERT(pid);
+    VERBOSE("client pid = %s", pid);
+
+    if (__find_worker(pid, &worker_id) != 0)
+        return 0;
+
+    VERBOSE("found worker id for %s = %x", pid, worker_id);
+    
+    __terminate_worker(manager, worker_id, pid);
+    __remove_worker(pid);
+
+    zmq_msg_close(&pid_msg);
+
+    return 0;
+}
+
+static void __client_closed_cb(const char *pid, void *data)
+{
+       pims_ipc_svc_t *ipc_svc = (pims_ipc_svc_t*)data;
+    
+    VERBOSE("client pid = %s", pid);
+
+    zmq_msg_t pid_msg;
+    zmq_msg_init_size(&pid_msg, strlen(pid) + 1);
+    memcpy(zmq_msg_data(&pid_msg), pid, strlen(pid) + 1);
+    if (_pims_zmq_msg_send(&pid_msg, ipc_svc->monitor, 0) == -1)
+        ERROR("send error : %s", zmq_strerror(errno));
+    zmq_msg_close(&pid_msg);
+}
+
+static int __open_zmq_socket(void *context, pims_ipc_svc_t *ipc_svc)
+{
+    char *path = NULL;
+    int ret = -1;
+    int i = 0;
+
+    void *router = zmq_socket(context, ZMQ_ROUTER);
+    if (!router)
+    {
+        ERROR("socket error : %s", zmq_strerror(errno));
+        return -1;
+    }
+    path = g_strdup_printf("ipc://%s", ipc_svc->service);
+    if (zmq_bind(router, path) != 0)
+    {
+        ERROR("bind error : %s", zmq_strerror(errno));
+        zmq_close(router);
+        return -1;
+    }
+    g_free(path);
+
+    ret = chown(ipc_svc->service, getuid(), ipc_svc->group);
+    ret = chmod(ipc_svc->service, ipc_svc->mode);
+
+    void *worker = zmq_socket(context, ZMQ_ROUTER);
+    if (!worker)
+    {
+        ERROR("socket error : %s", zmq_strerror(errno));
+        zmq_close(router);
+        return -1;
+    }
+    path = g_strdup_printf("inproc://%s-%s", ipc_svc->service, PIMS_IPC_DEALER_PATH);
+    if (zmq_bind(worker, path) != 0)
+    {
+        ERROR("bind error : %s", zmq_strerror(errno));
+        zmq_close(router);
+        zmq_close(worker);
+        return -1;
+    }
+    g_free(path);
+
+    void *manager = zmq_socket(context, ZMQ_ROUTER);
+    if (!manager)
+    {
+        ERROR("socket error : %s", zmq_strerror(errno));
+        zmq_close(router);
+        zmq_close(worker);
+        return -1;
+    }
+    path = g_strdup_printf("inproc://%s-%s", ipc_svc->service, PIMS_IPC_MANAGER_PATH);
+    if (zmq_bind(manager, path) != 0)
+    {
+        ERROR("bind error : %s", zmq_strerror(errno));
+        zmq_close(router);
+        zmq_close(worker);
+        zmq_close(manager);
+        return -1;
+    }
+    g_free(path);
+    
+    void *monitor = zmq_socket(context, ZMQ_PAIR);
+    if (!monitor)
+    {
+        ERROR("socket error : %s", zmq_strerror(errno));
+        zmq_close(router);
+        zmq_close(worker);
+        zmq_close(manager);
+        return -1;
+    }
+    path = g_strdup_printf("inproc://%s-%s", ipc_svc->service, PIMS_IPC_MONITOR2_PATH);
+    if (zmq_bind(monitor, path) != 0)
+    {
+        ERROR("bind error : %s", zmq_strerror(errno));
+        zmq_close(router);
+        zmq_close(worker);
+        zmq_close(manager);
+        zmq_close(monitor);
+        return -1;
+    }
+    g_free(path);
+
+    ipc_svc->context = context;
+    ipc_svc->router = router;
+    ipc_svc->worker = worker;
+    ipc_svc->manager = manager;
+    ipc_svc->monitor = monitor;
+
+    path = g_strdup_printf("%s-%s", ipc_svc->service, PIMS_IPC_MONITOR_PATH);
+    ret = _server_socket_init(path, ipc_svc->group, ipc_svc->mode, __client_closed_cb, ipc_svc);
+    ASSERT(ret != -1);
+    g_free(path);
+
+    // launch worker threads in advance
+    for (i = 0; i < ipc_svc->workers_max_count; i++)
+        __launch_worker(__worker_loop, context);
+
+    return 0;
+}
+
+static void __close_zmq_socket(pims_ipc_svc_t *ipc_svc)
+{
+    zmq_close(ipc_svc->router);
+    zmq_close(ipc_svc->worker);
+    zmq_close(ipc_svc->manager);
+    zmq_close(ipc_svc->monitor);
+}
+
+static int __open_zmq_socket_for_publish(void *context, pims_ipc_svc_for_publish_t *ipc_svc)
+{
+    char *path = NULL;
+    int ret = -1;
+
+    ipc_svc->context = context;
+    void *publisher = NULL;
+    publisher = zmq_socket(context, ZMQ_PUB);
+    if (!publisher)
+    {
+        ERROR("socket error : %s", zmq_strerror(errno));
+        return -1;
+    }
+
+    path = g_strdup_printf("ipc://%s", ipc_svc->service);
+    if (zmq_bind(publisher, path) != 0)
+    {
+        ERROR("bind error : %s", zmq_strerror(errno));
+        zmq_close(publisher);
+        return -1;
+    }
+    g_free(path);
+
+    ret = chown(ipc_svc->service, getuid(), ipc_svc->group);
+    ret = chmod(ipc_svc->service, ipc_svc->mode);
+
+    ipc_svc->context = context;
+    ipc_svc->publisher = publisher;
+
+    return 0;
+}
+
+static void __close_zmq_socket_for_publish(pims_ipc_svc_for_publish_t *ipc_svc)
+{
+    zmq_close(ipc_svc->publisher);
+}
+
+static void* __main_loop(void *args)
+{
+    char *path = NULL;
+    int ret = -1;
+       pims_ipc_svc_t *ipc_svc = (pims_ipc_svc_t*)args;
+
+    void *monitor_peer = zmq_socket(ipc_svc->context, ZMQ_PAIR);
+    ASSERT(monitor_peer);
+    
+    path = g_strdup_printf("inproc://%s-%s", ipc_svc->service, PIMS_IPC_MONITOR2_PATH);
+    ret = zmq_connect(monitor_peer, path);
+    ASSERT(ret == 0);
+    g_free(path);
+
+    // poll all sockets
+    while (1)
+    {
+        zmq_pollitem_t items[] = {
+            {ipc_svc->router, 0, ZMQ_POLLIN, 0},
+            {ipc_svc->worker, 0, ZMQ_POLLIN, 0},
+            {ipc_svc->manager, 0, ZMQ_POLLIN, 0},
+            {monitor_peer, 0, ZMQ_POLLIN, 0}
+        };
+
+        if (zmq_poll(items, 4, -1) == -1)
+        {
+            if (errno == EINTR)
+                continue;
+
+            ERROR("poll error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        if (items[0].revents & ZMQ_POLLIN)
+        {
+            __process_router_event(ipc_svc->context, ipc_svc->router, ipc_svc->worker, FALSE);
+        }
+
+        if (items[1].revents & ZMQ_POLLIN)
+        {
+            __process_worker_event(ipc_svc->context, ipc_svc->worker, ipc_svc->router);
+        }
+
+        if (items[2].revents & ZMQ_POLLIN)
+        {
+            __process_manager_event(ipc_svc->context, ipc_svc->manager);
+            if (ipc_svc->requests)
+                __process_router_event(ipc_svc->context, ipc_svc->router, ipc_svc->worker, TRUE);
+        }
+
+        if (items[3].revents & ZMQ_POLLIN)
+        {
+            __process_monitor_event(ipc_svc->context, monitor_peer, ipc_svc->manager);
+        }
+    }
+    
+    zmq_close(monitor_peer);
+
+    return NULL;
+}
+
+API void pims_ipc_svc_run_main_loop(GMainLoop* loop)
+{
+    int retval = -1;
+    GMainLoop* main_loop = loop;
+
+    if(main_loop == NULL) {
+        main_loop = g_main_loop_new(NULL, FALSE);
+    }
+
+    void *context = zmq_init(1);
+    ASSERT (context != NULL);
+
+    if (_g_singleton_for_publish)
+    {
+        retval = __open_zmq_socket_for_publish(context, _g_singleton_for_publish);
+        ASSERT(retval == 0);
+    }
+
+    if (_g_singleton)
+    {
+        retval = __open_zmq_socket(context, _g_singleton);
+        ASSERT(retval == 0);
+    }
+
+    __launch_worker(__main_loop, _g_singleton);
+
+    g_main_loop_run(main_loop);
+
+    if (_g_singleton)
+    {
+        __close_zmq_socket(_g_singleton);
+    }
+
+    if (_g_singleton_for_publish)
+    {
+        __close_zmq_socket_for_publish(_g_singleton_for_publish);
+    }
+
+    if (zmq_term(context) == -1)
+        WARNING("term error : %s", zmq_strerror(errno));
+}
diff --git a/src/pims-ipc.c b/src/pims-ipc.c
new file mode 100644 (file)
index 0000000..5f9be24
--- /dev/null
@@ -0,0 +1,813 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <glib.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include <pthread.h>
+
+#include <pims-internal.h>
+#include <pims-socket.h>
+#include <pims-debug.h>
+#include <pims-ipc-data.h>
+#include <pims-ipc.h>
+
+#define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
+    sequence_no = ++((handle)->call_sequence_no);\
+} while (0)
+
+static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
+
+typedef enum
+{
+    PIMS_IPC_CALL_STATUS_READY = 0,
+    PIMS_IPC_CALL_STATUS_IN_PROGRESS
+} pims_ipc_call_status_e;
+
+typedef enum
+{
+    PIMS_IPC_MODE_REQ = 0,
+    PIMS_IPC_MODE_SUB
+} pims_ipc_mode_e;
+
+typedef struct
+{
+    pid_t pid;
+    void *context;
+    unsigned int ref_cnt;
+    unsigned int id_sequence_no;
+    GList *subscribe_handles;
+} pims_ipc_context_t;
+
+static pims_ipc_context_t *_g_singleton = NULL;
+
+typedef struct
+{
+    pims_ipc_subscribe_cb callback;
+    void * user_data;
+} pims_ipc_cb_t;
+
+typedef struct
+{
+    int fd;
+    void *requester;
+    char *service;
+    char *id;
+    GIOChannel *async_channel;
+    guint async_source_id;
+    pims_ipc_call_status_e call_status;
+    unsigned int call_sequence_no;
+    pims_ipc_call_async_cb call_async_callback;
+    void *call_async_userdata;
+    GHashTable *subscribe_cb_table;
+} pims_ipc_t;
+
+
+static void __pims_ipc_free_handle(pims_ipc_t *handle)
+{
+    pthread_mutex_lock(&__gmutex);
+
+    g_free(handle->id);
+    g_free(handle->service);
+
+    if (handle->requester)
+        zmq_close(handle->requester);
+
+    if (handle->fd != -1)
+        close(handle->fd);
+    
+    if (handle->async_channel)
+    {
+        // remove a subscriber handle from the golbal list
+        if (_g_singleton)
+        {
+            _g_singleton->subscribe_handles = g_list_remove(_g_singleton->subscribe_handles, handle);
+            VERBOSE("the count of subscribe handles = %d", g_list_length(_g_singleton->subscribe_handles));
+        }
+
+        g_source_remove(handle->async_source_id);
+        g_io_channel_unref(handle->async_channel);
+    }
+
+    if (handle->subscribe_cb_table)
+        g_hash_table_destroy(handle->subscribe_cb_table);
+
+    g_free(handle);
+
+    if (_g_singleton && --_g_singleton->ref_cnt <= 0)
+    {
+        if (zmq_term(_g_singleton->context) == -1)
+        {
+            WARNING("term error : %s", zmq_strerror(errno));
+        }
+        g_free(_g_singleton);
+        _g_singleton = NULL;
+    }
+    
+    pthread_mutex_unlock(&__gmutex);
+}
+
+static int __pims_ipc_receive_for_subscribe(pims_ipc_t *handle)
+{
+    gboolean is_valid = FALSE;
+    int64_t more = 0;
+    pims_ipc_data_h dhandle = NULL;
+    pims_ipc_cb_t *cb_data = NULL;
+   
+    zmq_msg_t call_id_msg;
+    zmq_msg_t data_msg;
+
+    zmq_msg_init(&call_id_msg);
+    zmq_msg_init(&data_msg);
+
+    do {
+        // recv call id 
+        if (_pims_zmq_msg_recv(&call_id_msg, handle->requester, 0) == -1)
+        {
+            ERROR("recv error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        // find a callback by a call id
+        cb_data = (pims_ipc_cb_t*)g_hash_table_lookup(handle->subscribe_cb_table, zmq_msg_data(&call_id_msg));
+
+        size_t more_size = sizeof(more);
+        zmq_getsockopt(handle->requester, ZMQ_RCVMORE, &more, &more_size);
+        if (more)
+        {
+            if (_pims_zmq_msg_recv(&data_msg, handle->requester, 0) == -1)
+            {
+                ERROR("recv error : %s", zmq_strerror(errno));
+                break;
+            } 
+            
+            if (cb_data == NULL)
+            {
+                VERBOSE("unable to find %s", (char*)zmq_msg_data(&call_id_msg));
+                is_valid = TRUE;
+                break;
+            }
+
+            dhandle = pims_ipc_data_unmarshal_with_zmq(&data_msg);
+            if (dhandle == NULL)
+            {
+                ERROR("unmarshal error");
+                break;
+            }
+
+            cb_data->callback((pims_ipc_h)handle, dhandle, cb_data->user_data);
+        }
+
+        is_valid = TRUE;
+    } while (0);
+
+    zmq_msg_close(&call_id_msg);
+    zmq_msg_close(&data_msg);
+    if (dhandle)
+        pims_ipc_data_destroy(dhandle);
+
+    if (is_valid == FALSE)
+        return -1;
+    return 0;
+}
+
+static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
+{
+    pims_ipc_t *handle = (pims_ipc_t *)data;
+    uint32_t zmq_events = 0;
+    size_t opt_len = 0;
+    int rc = 0;
+
+    VERBOSE("");
+
+    pthread_mutex_lock(&__gmutex);
+
+    // check if a subscriber handle is exists
+    if (_g_singleton == NULL || g_list_find(_g_singleton->subscribe_handles, handle) == NULL)
+    {
+        DEBUG("No such handle that ID is %p", handle);
+        pthread_mutex_unlock(&__gmutex);
+        return FALSE;
+    }
+    
+    opt_len = sizeof(uint32_t);
+    while (1)
+    {
+        rc = zmq_getsockopt(handle->requester, ZMQ_EVENTS, &zmq_events, &opt_len);
+        ASSERT(rc == 0);
+        if (ZMQ_POLLIN & zmq_events) {
+            __pims_ipc_receive_for_subscribe(handle);
+        }
+        else
+        {
+            break;
+        }
+    }
+
+    pthread_mutex_unlock(&__gmutex);
+
+    return TRUE;
+}
+
+static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
+{
+    pims_ipc_context_t *ghandle = NULL;
+    pims_ipc_t *handle = NULL;
+    pid_t pid = 0;
+    void *context = NULL;
+    void *requester = NULL;
+    char *path = NULL;
+    gboolean is_ok = FALSE;
+        
+    pthread_mutex_lock(&__gmutex);
+
+    do {
+        if (_g_singleton == NULL)
+        {
+            ghandle = g_new0(pims_ipc_context_t, 1);
+            if (ghandle == NULL)
+            {
+                ERROR("Failed to allocation");
+                break;
+            }
+            
+            pid = getpid();
+            ghandle->pid = pid;
+            VERBOSE("The PID of the current process is %d.", pid);
+
+            context = zmq_init(1);
+            if (!context)
+            {
+                ERROR("init error : %s", zmq_strerror(errno));
+                break;
+            }
+            ghandle->context = context;
+            ghandle->id_sequence_no = (unsigned int)time(NULL);
+            ghandle->ref_cnt = 1;
+            _g_singleton = ghandle;
+        }
+        else
+        {
+            ghandle = _g_singleton;
+            ghandle->ref_cnt++;
+            pid = ghandle->pid;
+            context = ghandle->context;
+        }
+
+        VERBOSE("Create %d th..", ghandle->ref_cnt);
+
+        handle = g_new0(pims_ipc_t, 1);
+        if (handle == NULL)
+        {
+            ERROR("Failed to allocation");
+            break;
+        }
+        handle->fd = -1;
+
+        handle->service = g_strdup(service);
+        handle->id = g_strdup_printf("%x:%x", pid, ghandle->id_sequence_no++);
+        
+        if (mode == PIMS_IPC_MODE_REQ)
+        {
+            path = g_strdup_printf("%s-%s", handle->service, PIMS_IPC_MONITOR_PATH);
+            handle->fd = _client_socket_init(path, handle->id);
+            if (handle->fd == -1)
+            {
+                g_free(path);
+                break;
+            }
+            g_free(path);
+
+            requester = zmq_socket(context, ZMQ_REQ);
+            if (!requester)
+            {
+                ERROR("socket error : %s", zmq_strerror(errno));
+                break;
+            }
+            if (zmq_setsockopt(requester, ZMQ_IDENTITY, handle->id, strlen(handle->id) + 1) != 0)
+            {
+                ERROR("setsockopt error : %s", zmq_strerror(errno));
+                break;
+            }
+            handle->requester = requester;
+
+            path = g_strdup_printf("ipc://%s", handle->service);
+            if (zmq_connect(requester, path) != 0)
+            {
+                ERROR("connect error : %s", zmq_strerror(errno));
+                g_free(path);
+                break;
+            }
+            g_free(path);
+
+            handle->call_sequence_no = (unsigned int)time(NULL);
+            if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0)
+            {
+                WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
+            }
+        }
+        else
+        {
+            requester = zmq_socket(context, ZMQ_SUB);
+            if (!requester)
+            {
+                ERROR("socket error : %s", zmq_strerror(errno));
+                break;
+            }
+            if (zmq_setsockopt(requester, ZMQ_SUBSCRIBE, "", 0) != 0)
+            {
+                ERROR("setsockopt error : %s", zmq_strerror(errno));
+                break;
+            }
+            handle->requester = requester;
+
+            path = g_strdup_printf("ipc://%s", handle->service);
+            if (zmq_connect(requester, path) != 0)
+            {
+                ERROR("connect error : %s", zmq_strerror(errno));
+                g_free(path);
+                break;
+            }
+            g_free(path);
+
+            int fd = -1;
+            size_t opt_len = sizeof(int);
+            int rc = zmq_getsockopt(handle->requester, ZMQ_FD, &fd, &opt_len);
+            ASSERT(rc == 0);
+
+            handle->async_channel = g_io_channel_unix_new(fd);
+            if (!handle->async_channel)
+            {
+                ERROR("g_io_channel_unix_new error");
+                break;
+            }
+
+            guint source_id = 0;
+            source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_subscribe_handler, handle);
+            handle->async_source_id = source_id;
+            handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
+            ASSERT(handle->subscribe_cb_table);
+
+            // add a subscriber handle to the global list
+            ghandle->subscribe_handles = g_list_append(ghandle->subscribe_handles, handle);
+            VERBOSE("the count of subscribe handles = %d", g_list_length(ghandle->subscribe_handles));
+        }
+
+        is_ok = TRUE;
+        VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
+    } while(0);
+
+    pthread_mutex_unlock(&__gmutex);
+
+    if (FALSE == is_ok)
+    {
+        if (handle)
+        {
+            __pims_ipc_free_handle(handle);
+            handle = NULL;
+        }
+    }
+
+    return handle;
+}
+
+API pims_ipc_h pims_ipc_create(char *service)
+{
+    return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
+}
+
+API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
+{
+    return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
+}
+
+static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
+{
+    pims_ipc_t *handle = (pims_ipc_t *)ipc;
+
+    if (mode == PIMS_IPC_MODE_REQ)
+    {
+        if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0)
+        {
+            WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
+        }
+    }
+
+    __pims_ipc_free_handle(handle);
+}
+
+API void pims_ipc_destroy(pims_ipc_h ipc)
+{
+    __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
+}
+
+API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
+{
+    __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
+}
+
+static void __pims_ipc_data_free_cb(void *data, void *hint)
+{
+    if (hint)
+        g_free(hint);
+}
+
+static int __pims_ipc_send(pims_ipc_t *handle, char *module, char *function, pims_ipc_h data_in)
+{
+    gboolean is_valid = FALSE;
+    unsigned int sequence_no = 0;
+    gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
+
+    // init messages
+    zmq_msg_t sequence_no_msg;
+    zmq_msg_t call_id_msg;
+    zmq_msg_t data_in_msg;
+
+    zmq_msg_init_size(&sequence_no_msg, sizeof(unsigned int));
+    GET_CALL_SEQUNECE_NO(handle, sequence_no);
+    memcpy(zmq_msg_data(&sequence_no_msg), &(sequence_no), sizeof(unsigned int));
+
+    zmq_msg_init_data(&call_id_msg, call_id, strlen(call_id) + 1, __pims_ipc_data_free_cb, call_id);
+    VERBOSE("call id = %s", (char*)zmq_msg_data(&call_id_msg));
+
+    zmq_msg_init(&data_in_msg);
+
+    do {
+        // send sequence no
+        if (_pims_zmq_msg_send(&sequence_no_msg, handle->requester, ZMQ_SNDMORE) == -1)
+        {
+            ERROR("send error : %s", zmq_strerror(errno));
+            break;
+        }
+
+        if (data_in == NULL)
+        {
+            // send call id
+            if (_pims_zmq_msg_send(&call_id_msg, handle->requester, 0) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+        }
+        else
+        {
+            // send call id
+            if (_pims_zmq_msg_send(&call_id_msg, handle->requester, ZMQ_SNDMORE) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+
+            // marshal data
+            if (pims_ipc_data_marshal_with_zmq(data_in, &data_in_msg) != 0)
+            {
+                ERROR("marshal error");
+                break;
+            }
+
+            VERBOSE("the size of sending data = %d", zmq_msg_size(&data_in_msg));
+            
+            // send data
+            if (_pims_zmq_msg_send(&data_in_msg, handle->requester, 0) == -1)
+            {
+                ERROR("send error : %s", zmq_strerror(errno));
+                break;
+            }
+        }
+
+        is_valid = TRUE;
+    } while (0);
+
+    zmq_msg_close(&sequence_no_msg);
+    zmq_msg_close(&call_id_msg);
+    zmq_msg_close(&data_in_msg);
+
+    if (is_valid == FALSE)
+        return -1;
+    return 0;
+}
+
+static int __pims_ipc_receive(pims_ipc_t *handle, pims_ipc_data_h *data_out)
+{
+    gboolean is_ok = FALSE;
+    gboolean is_valid = FALSE;
+    int64_t more = 0;
+    pims_ipc_data_h dhandle = NULL;
+    unsigned int sequence_no = 0;
+   
+    zmq_msg_t sequence_no_msg;
+    zmq_msg_t call_id_msg;
+    zmq_msg_t data_out_msg;
+
+    while (1)
+    {
+        is_valid = FALSE;
+        more = 0;
+
+        zmq_msg_init(&sequence_no_msg);
+        zmq_msg_init(&call_id_msg);
+        zmq_msg_init(&data_out_msg);
+
+        do {
+            // recv sequence no
+            if (_pims_zmq_msg_recv(&sequence_no_msg, handle->requester, 0) == -1)
+            {
+                ERROR("recv error : %s", zmq_strerror(errno));
+                break;
+            }
+            memcpy(&sequence_no, zmq_msg_data(&sequence_no_msg), sizeof(unsigned int));
+
+            // recv call id 
+            if (_pims_zmq_msg_recv(&call_id_msg, handle->requester, 0) == -1)
+            {
+                ERROR("recv error : %s", zmq_strerror(errno));
+                break;
+            }
+
+            size_t more_size = sizeof(more);
+            zmq_getsockopt(handle->requester, ZMQ_RCVMORE, &more, &more_size);
+            if (more)
+            {
+                if (_pims_zmq_msg_recv(&data_out_msg, handle->requester, 0) == -1)
+                {
+                    ERROR("recv error : %s", zmq_strerror(errno));
+                    break;
+                }
+                dhandle = pims_ipc_data_unmarshal_with_zmq(&data_out_msg);
+                if (dhandle == NULL)
+                {
+                    ERROR("unmarshal error");
+                    break;
+                }
+
+                if (sequence_no == handle->call_sequence_no)
+                {
+                    if (data_out != NULL)
+                        *data_out = dhandle;
+                    is_ok = TRUE;
+                }
+                else
+                {
+                    pims_ipc_data_destroy(dhandle);
+                    DEBUG("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no);
+                }
+            }
+            else
+            {
+                if (sequence_no == handle->call_sequence_no)
+                    is_ok = TRUE;
+            }
+
+            is_valid = TRUE;
+        } while (0);
+
+        zmq_msg_close(&sequence_no_msg);
+        zmq_msg_close(&call_id_msg);
+        zmq_msg_close(&data_out_msg);
+
+        if (is_ok)
+            return 0;
+
+        if (is_valid == FALSE)
+            return -1;
+    }
+
+    return -1;
+}
+
+API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
+                      pims_ipc_data_h *data_out)
+{
+    pims_ipc_t *handle = (pims_ipc_t *)ipc;
+
+
+    if (ipc == NULL)
+    {
+        ERROR("invalid handle : %p", ipc);
+        return -1;
+    }
+
+    if (!module || !function)
+    {
+        ERROR("invalid argument");
+        return -1;
+    }
+
+    if (handle->call_status != PIMS_IPC_CALL_STATUS_READY)
+    {
+        ERROR("the previous call is in progress : %p", ipc);
+        return -1;
+    }
+
+    if (__pims_ipc_send(handle, module, function, data_in) != 0)
+    {
+        return -1;
+    }
+
+    if (__pims_ipc_receive(handle, data_out) != 0)
+    {
+        return -1;
+    }
+    
+    return 0;
+}
+
+static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data)
+{
+    pims_ipc_t *handle = (pims_ipc_t *)data;
+    uint32_t zmq_events = 0;
+    size_t opt_len = 0;
+    int rc = 0;
+
+    VERBOSE("");
+
+    opt_len = sizeof(uint32_t);
+    while (1)
+    {
+        rc = zmq_getsockopt(handle->requester, ZMQ_EVENTS, &zmq_events, &opt_len);
+        ASSERT(rc == 0);
+        if (ZMQ_POLLIN & zmq_events) {
+            pims_ipc_data_h dhandle = NULL;
+            if (__pims_ipc_receive(handle, &dhandle) == 0)
+            {
+                VERBOSE("call status = %d", handle->call_status);
+                if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS)
+                {
+                    pims_ipc_data_destroy(dhandle);
+                }
+                else
+                {
+                    handle->call_status = PIMS_IPC_CALL_STATUS_READY;
+                    handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
+                    pims_ipc_data_destroy(dhandle);
+                }
+            }
+        }
+        else
+        {
+            break;
+        }
+    }
+
+    return FALSE;
+}
+
+API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
+                            pims_ipc_call_async_cb callback, void *userdata)
+{
+    pims_ipc_t *handle = (pims_ipc_t *)ipc;
+    guint source_id = 0;
+
+    if (ipc == NULL)
+    {
+        ERROR("invalid handle : %p", ipc);
+        return -1;
+    }
+
+    if (!module || !function || !callback)
+    {
+        ERROR("invalid argument");
+        return -1;
+    }
+
+    if (handle->call_status != PIMS_IPC_CALL_STATUS_READY)
+    {
+        ERROR("the previous call is in progress : %p", ipc);
+        return -1;
+    }
+
+    handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
+    handle->call_async_callback = callback;
+    handle->call_async_userdata = userdata;
+
+    // add a callback for GIOChannel
+    if (!handle->async_channel)
+    {
+        int fd = -1;
+        size_t opt_len = sizeof(int);
+        int rc = zmq_getsockopt(handle->requester, ZMQ_FD, &fd, &opt_len);
+        ASSERT(rc == 0);
+
+        handle->async_channel = g_io_channel_unix_new(fd);
+        if (!handle->async_channel)
+        {
+            ERROR("g_io_channel_unix_new error");
+            return -1;
+        }
+    }
+    
+    source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle);
+    handle->async_source_id = source_id;
+
+    if (__pims_ipc_send(handle, module, function, data_in) != 0)
+    {
+        g_source_remove(source_id);
+        return -1;
+    }
+    
+    uint32_t zmq_events = 0;
+    size_t opt_len = sizeof(uint32_t);
+    int rc = 0;
+    rc = zmq_getsockopt(handle->requester, ZMQ_EVENTS, &zmq_events, &opt_len);
+    ASSERT(rc == 0);
+
+    return 0;
+}
+
+API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc)
+{
+    pims_ipc_t *handle = (pims_ipc_t *)ipc;
+
+    if (ipc == NULL)
+    {
+        ERROR("invalid handle : %p", ipc);
+        return false;
+    }
+
+    if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
+        return true;
+    else
+        return false;
+}
+
+API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata)
+{
+    gchar *call_id = NULL;
+    pims_ipc_cb_t *cb_data = NULL;
+    pims_ipc_t *handle = (pims_ipc_t *)ipc;
+
+    if (ipc == NULL || handle->subscribe_cb_table == NULL)
+    {
+        ERROR("invalid handle : %p", ipc);
+        return -1;
+    }
+
+    if (!module || !event || !callback)
+    {
+        ERROR("invalid argument");
+        return -1;
+    }
+
+    cb_data = g_new0(pims_ipc_cb_t, 1);
+    call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
+
+    VERBOSE("subscribe cb id[%s]", call_id);
+    cb_data->callback = callback;
+    cb_data->user_data = userdata;
+    g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
+
+    return 0;
+}
+
+API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
+{
+    gchar *call_id = NULL;
+    pims_ipc_t *handle = (pims_ipc_t *)ipc;
+
+    if (ipc == NULL || handle->subscribe_cb_table == NULL)
+    {
+        ERROR("invalid handle : %p", ipc);
+        return -1;
+    }
+
+    if (!module || !event)
+    {
+        ERROR("invalid argument");
+        return -1;
+    }
+
+    call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
+
+    VERBOSE("unsubscribe cb id[%s]", call_id);
+
+    if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE)
+    {
+        ERROR("g_hash_table_remove error");
+        g_free(call_id);
+        return -1;
+    }
+
+    g_free(call_id);
+    return 0;
+}
diff --git a/src/pims-socket.c b/src/pims-socket.c
new file mode 100644 (file)
index 0000000..f2ee131
--- /dev/null
@@ -0,0 +1,286 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef _NON_SLP
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <glib.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <systemd/sd-daemon.h>
+
+#include <pims-internal.h>
+#include <pims-debug.h>
+#include <pims-socket.h>
+
+typedef struct
+{
+    int sockfd;
+    server_socket_client_closed_cb callback;
+    void *user_data;
+    GHashTable *client_table;
+} server_socket_context_t;
+
+static int __socket_writen(int fd, char *buf, int buf_size)
+{
+    int ret, writed = 0;
+    while (buf_size)
+    {
+        ret = write(fd, buf+writed, buf_size);
+        if (-1 == ret)
+        {
+            if (EINTR == errno)
+                continue;
+            else
+                return ret;
+        }
+        writed += ret;
+        buf_size -= ret;
+    }
+    return writed;
+}
+
+static int __socket_readn(int fd, char *buf, int buf_size)
+{
+    int ret, read_size = 0;
+
+    while (buf_size)
+    {
+        ret = read(fd, buf+read_size, buf_size);
+        if (-1 == ret)
+        {
+            if (EINTR == errno)
+                continue;
+            else
+                return ret;
+        }
+        read_size += ret;
+        buf_size -= ret;
+    }
+    return read_size;
+}
+
+#define PIMS_IPC_PID_BUFFER_SIZE    20
+static gboolean __request_handler(GIOChannel *src, GIOCondition condition, gpointer data)
+{
+    server_socket_context_t *context = (server_socket_context_t*)data;
+    int ret = -1;
+    int fd = -1;
+    int orig_fd = -1;
+    char *pid = NULL;
+    char buffer[PIMS_IPC_PID_BUFFER_SIZE] = "";
+
+    fd = g_io_channel_unix_get_fd(src);
+
+    if (G_IO_HUP & condition)
+    {
+        close(fd);
+
+        if (g_hash_table_lookup_extended(context->client_table, GINT_TO_POINTER(fd),
+                    (gpointer*)&orig_fd, (gpointer*)&pid) == TRUE)
+        {
+            VERBOSE("found pid for %u = %s", fd, pid);
+            context->callback((const char*)pid, context->user_data);
+            g_hash_table_remove(context->client_table, (gconstpointer)fd);
+        }
+        else
+        {
+            VERBOSE("unable to find pid for %u", fd);
+        }
+
+        return FALSE;
+    }
+
+    memset(buffer, 0x00, PIMS_IPC_PID_BUFFER_SIZE);
+    ret = read(fd, (char *)buffer, PIMS_IPC_PID_BUFFER_SIZE-1);
+    if (ret <= 0)
+    {
+        ERROR("read error : %s", strerror(errno));
+        close(fd);
+
+        return FALSE;
+    }
+
+    VERBOSE("client fd = %u, pid = %s", fd, buffer);
+    g_hash_table_insert(context->client_table, GINT_TO_POINTER(fd), g_strdup(buffer));
+
+    pid_t mypid = getpid();
+    ret = __socket_writen(fd, (char*)&mypid, sizeof(pid_t));
+    if (ret != sizeof(pid_t))
+    {
+        ERROR("write error : %s", strerror(errno));
+        close(fd);
+        g_hash_table_remove(context->client_table, (gconstpointer)fd);
+
+        return FALSE;
+    }
+
+   return TRUE;
+}
+
+static gboolean __socket_handler(GIOChannel *src, GIOCondition condition, gpointer data)
+{
+    GIOChannel *channel;
+    server_socket_context_t *context = (server_socket_context_t*)data;
+    int client_sockfd = -1;
+    int sockfd = context->sockfd;
+    struct sockaddr_un clientaddr;
+    socklen_t client_len = sizeof(clientaddr);
+
+    client_sockfd = accept(sockfd, (struct sockaddr *)&clientaddr, &client_len);
+    if (-1 == client_sockfd)
+    {
+        ERROR("accept error : %s", strerror(errno));
+        return TRUE;
+    }
+
+    channel = g_io_channel_unix_new(client_sockfd);
+    g_io_add_watch(channel, G_IO_IN|G_IO_HUP, __request_handler, data);
+    g_io_channel_unref(channel);
+
+    return TRUE;
+}
+
+int _server_socket_init(const char *path, gid_t group, mode_t mode,
+        server_socket_client_closed_cb callback, void *user_data)
+{
+    int sockfd = -1;
+    GIOChannel *gio = NULL;
+
+    if (sd_listen_fds(1) == 1 && sd_is_socket_unix(SD_LISTEN_FDS_START, SOCK_STREAM, -1, path, 0) > 0)
+    {
+        DEBUG("using system daemon");
+
+        sockfd = SD_LISTEN_FDS_START;
+    }
+    else
+    {
+        struct sockaddr_un addr;
+        int ret = -1;
+        
+        DEBUG("using local socket");
+
+        unlink(path);
+
+        bzero(&addr, sizeof(addr));
+        addr.sun_family = AF_UNIX;
+        snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path);
+
+        sockfd = socket(PF_UNIX, SOCK_STREAM, 0);
+        if (-1 == sockfd)
+        {
+            ERROR("socket error : %s", strerror(errno));
+            return -1;
+        }
+
+        ret = bind(sockfd, (struct sockaddr *)&addr, sizeof(addr));
+        if (-1 == ret)
+        {
+            ERROR("bind error : %s", strerror(errno));
+            close(sockfd);
+            return -1;
+        }
+
+        ret = chown(path, getuid(), group);
+        ret = chmod(path, mode);
+
+        ret = listen(sockfd, 30);
+        if (-1 == ret)
+        {
+            ERROR("listen error : %s", strerror(errno));
+            close(sockfd);
+            return -1;
+        }
+    }
+
+    gio = g_io_channel_unix_new(sockfd);
+
+    server_socket_context_t *context = g_new0(server_socket_context_t, 1);
+    context->sockfd = sockfd;
+    context->callback = callback;
+    context->user_data = user_data;
+    context->client_table = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, g_free);
+    ASSERT(context->client_table);
+
+    g_io_add_watch(gio, G_IO_IN, __socket_handler, (gpointer)context);
+
+    return sockfd;
+}
+
+int _client_socket_init(const char *path, const char *pid)
+{
+    int sockfd = -1;
+    int ret = -1;
+    struct sockaddr_un caddr = {0};
+    pid_t server_pid = 0;
+
+    ASSERT(path != NULL);
+    ASSERT(pid != NULL);
+    bzero(&caddr, sizeof(caddr));
+    caddr.sun_family = AF_UNIX;
+    snprintf(caddr.sun_path, sizeof(caddr.sun_path), "%s", path);
+
+    sockfd = socket(PF_UNIX, SOCK_STREAM, 0);
+    if (-1 == sockfd)
+    {
+        ERROR("socket error : %s", strerror(errno));
+        return -1;
+    }
+
+    ret = connect(sockfd, (struct sockaddr *)&caddr, sizeof(caddr));
+    if (-1 == ret) {
+        ERROR("connect error : %s", strerror(errno));
+        close(sockfd);
+        return -1;
+    }
+    ret = __socket_writen(sockfd, (char*)pid, strlen(pid) + 1);
+    if (ret <= 0)
+    {
+        ERROR("write error : %s", strerror(errno));
+        close(sockfd);
+        return -1;
+    }
+    ret = __socket_readn(sockfd, (char*)&server_pid, sizeof(pid_t));
+    if (ret != sizeof(pid_t))
+    {
+        ERROR("read error : %s", strerror(errno));
+        close(sockfd);
+        return -1;
+    }
+
+    return sockfd;
+}
+
+#else
+#include <pims-socket.h>
+
+int _server_socket_init(const char *path, gid_t group, mode_t mode,
+        server_socket_client_closed_cb callback, void *user_data)
+{
+    return 0;
+}
+
+int _client_socket_init(const char *path, const char *pid)
+{
+    return 0;
+}
+
+#endif
diff --git a/src/pims-socket.h b/src/pims-socket.h
new file mode 100644 (file)
index 0000000..5cf2696
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __PIMS_SOCKET_H__
+#define __PIMS_SOCKET_H__
+
+#include <unistd.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#ifdef _cplusplus
+extern "C"
+{
+#endif
+
+typedef void (*server_socket_client_closed_cb)(const char *pid, void *user_data);
+int _server_socket_init(const char *path, gid_t group, mode_t mode,
+        server_socket_client_closed_cb callback, void *user_data);
+int _client_socket_init(const char *path, const char *pid);
+
+#ifdef _cplusplus
+}
+#endif
+
+#endif /* __PIMS_SOCKET_H__ */
+
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
new file mode 100755 (executable)
index 0000000..fb2513b
--- /dev/null
@@ -0,0 +1,44 @@
+CMAKE_MINIMUM_REQUIRED(VERSION 2.6)
+PROJECT(pims_ipc_test C)
+
+####################################################################
+# Basic configuration                                              #
+####################################################################
+# Set ready to build
+SET(EXT_LIBS_DIRS "")
+SET(EXT_LIBS_DEFS "")
+SET(EXT_LIBS_LDFLAGS "")
+
+# Set external libraries
+SET(EXT_LIBS
+    E_GLIB
+)
+
+FOREACH(flag ${EXT_LIBS})
+    SET(EXT_LIBS_DIRS       ${EXT_LIBS_DIRS}    ${${flag}_INCLUDE_DIRS})
+    SET(EXT_LIBS_DEFS       ${EXT_LIBS_DEFS}    ${${flag}_CFLAGS_OTHER})
+    SET(EXT_LIBS_LDFLAGS    ${EXT_LIBS_LDFLAGS} ${${flag}_LDFLAGS})
+ENDFOREACH(flag)
+
+####################################################################
+# Build this project                                               #
+####################################################################
+
+# Set source
+SET(SRCS
+    ${CMAKE_CURRENT_SOURCE_DIR}/test.c
+)
+
+INCLUDE_DIRECTORIES(
+    ${CMAKE_SOURCE_DIR}/include
+    ${EXT_LIBS_DIRS}
+)
+
+ADD_DEFINITIONS(
+    ${EXT_LIBS_DEFS}
+)
+
+ADD_EXECUTABLE(${PROJECT_NAME} ${SRCS})
+TARGET_LINK_LIBRARIES(${PROJECT_NAME} pims-ipc ${EXT_LIBS_LDFLAGS})
+INSTALL(TARGETS ${PROJECT_NAME} DESTINATION lib)
+
diff --git a/test/sock-test.c b/test/sock-test.c
new file mode 100644 (file)
index 0000000..b086864
--- /dev/null
@@ -0,0 +1,178 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define NAME "dom_sock_test"
+
+#define REPEAT_COUNT    100
+//#define BUFFER_SIZE 1024*1024
+#define BUFFER_SIZE 1024
+
+#define PLUS_TIME(a, b, c) do {\
+    a.tv_sec = b.tv_sec + c.tv_sec;\
+    a.tv_usec = b.tv_usec + c.tv_usec;\
+    if (a.tv_usec >= 1000000) {\
+        a.tv_sec++;\
+        a.tv_usec -= 1000000;\
+    }\
+} while (0)
+
+#define MINUS_TIME(a, b, c) do {\
+    a.tv_sec = b.tv_sec - c.tv_sec;\
+    a.tv_usec = b.tv_usec - c.tv_usec;\
+    if (a.tv_usec < 0) {\
+        a.tv_sec--;\
+        a.tv_usec += 1000000;\
+    }\
+} while (0)
+
+
+static char buffer[BUFFER_SIZE+1] = {0,};
+
+void server_main()
+{
+    int sock, msgsock, rval;
+    struct sockaddr_un server;
+
+    sock = socket(AF_UNIX, SOCK_STREAM, 0);
+    if (sock < 0) {
+        perror("opening stream socket");
+        exit(1);
+    }
+
+    server.sun_family = AF_UNIX;
+    strcpy(server.sun_path, NAME);
+    if (bind(sock, (struct sockaddr *) &server, sizeof(struct sockaddr_un))) {
+        perror("binding stream socket");
+        exit(1);
+    }
+    printf("Socket has name %s\n", server.sun_path);
+    listen(sock, 5);
+    for (;;) {
+        msgsock = accept(sock, 0, 0);
+        if (msgsock == -1)
+            perror("accept");
+        else do {
+            if ((rval = read(msgsock, buffer, BUFFER_SIZE)) < 0)
+                perror("reading stream message");
+            else if (rval == 0)
+                printf("Ending connection\n");
+            else
+            {
+                if ((rval = write(msgsock, buffer, BUFFER_SIZE)) < 0)
+                    perror("writing stream message");
+
+            }
+        } while (rval > 0);
+        close(msgsock);
+    }
+    close(sock);
+    unlink(NAME);
+}
+
+void client_main()
+{
+    int i;
+    int sock;
+    struct sockaddr_un server;
+    struct timeval start_tv = {0, 0};
+    struct timeval end_tv = {0, 0};
+    struct timeval diff_tv = {0, 0};
+    struct timeval prev_sum_tv = {0, 0};
+    struct timeval sum_tv = {0, 0};
+    int count = 0;
+
+    sock = socket(AF_UNIX, SOCK_STREAM, 0);
+    if (sock < 0) {
+        perror("opening stream socket");
+        exit(1);
+    }
+    server.sun_family = AF_UNIX;
+    strcpy(server.sun_path, NAME);
+
+    for (i = 0; i < BUFFER_SIZE; i++)
+        buffer[i] = 'a';
+    buffer[i] = 0;
+
+    if (connect(sock, (struct sockaddr *) &server, sizeof(struct sockaddr_un)) < 0) {
+        close(sock);
+        perror("connecting stream socket");
+        exit(1);
+    }
+    for (i = 0; i < REPEAT_COUNT; i++)
+    {
+
+        gettimeofday(&start_tv, NULL);
+        if (write(sock, buffer, BUFFER_SIZE) < 0)
+            perror("writing on stream socket");
+        if (read(sock, buffer, BUFFER_SIZE) < 0)
+            perror("read on stream socket");
+        gettimeofday(&end_tv, NULL);
+        MINUS_TIME(diff_tv, end_tv, start_tv);
+        PLUS_TIME(sum_tv, diff_tv, prev_sum_tv);
+        prev_sum_tv = sum_tv;
+        count++;
+        printf("start[%lu:%lu] end[%lu:%lu] diff[%lu:%lu]\n",
+                start_tv.tv_sec, start_tv.tv_usec,
+                end_tv.tv_sec, end_tv.tv_usec,
+                diff_tv.tv_sec, diff_tv.tv_usec);
+
+    }
+    if (i == REPEAT_COUNT)
+    {
+        printf("sum[%lu:%lu] count[%d]\n",
+                sum_tv.tv_sec, sum_tv.tv_usec, count);
+        printf("avg[%lu:%lu]\n",
+                sum_tv.tv_sec / count, (sum_tv.tv_sec % count * 1000000 + sum_tv.tv_usec) / count);
+    }
+
+    close(sock);
+}
+
+int main(int argc, char *argv[])
+{
+    if (argc != 2)
+    {
+        printf("Usage: %s client|server\n", argv[0]);
+        return -1;
+    }
+    if (strcmp(argv[1], "client") == 0)
+    {
+        printf("client mode..\n");
+        client_main();
+    }
+    else if (strcmp(argv[1], "server") == 0)
+    {
+        printf("server mode..\n");
+        server_main();
+    }
+    else
+    {
+        printf("Usage: %s client|server\n", argv[0]);
+        return -1;
+    }
+    return 0;
+}
+
+
diff --git a/test/test.c b/test/test.c
new file mode 100644 (file)
index 0000000..6335a38
--- /dev/null
@@ -0,0 +1,586 @@
+/*
+ * PIMS IPC
+ *
+ * Copyright (c) 2012 Samsung Electronics Co., Ltd. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <glib.h>
+#include <pims-ipc.h>
+#include <pims-ipc-svc.h>
+#include <pims-ipc-data.h>
+
+#define THREAD_COUNT    1
+#define REPEAT_COUNT    100
+#define BUFFER_SIZE 1024
+#define PLUS_TIME(a, b, c) do {\
+    a.tv_sec = b.tv_sec + c.tv_sec;\
+    a.tv_usec = b.tv_usec + c.tv_usec;\
+    if (a.tv_usec >= 1000000) {\
+        a.tv_sec++;\
+        a.tv_usec -= 1000000;\
+    }\
+} while (0)
+
+#define MINUS_TIME(a, b, c) do {\
+    a.tv_sec = b.tv_sec - c.tv_sec;\
+    a.tv_usec = b.tv_usec - c.tv_usec;\
+    if (a.tv_usec < 0) {\
+        a.tv_sec--;\
+        a.tv_usec += 1000000;\
+    }\
+} while (0)
+
+typedef struct {
+    int thread_count;
+    int repeat_count;
+    int message_size;
+    pims_ipc_h ipc;
+} test_arg_t;
+
+static gboolean __is_async = FALSE;
+static gboolean __is_publish = FALSE;
+
+void test_function(pims_ipc_h ipc, pims_ipc_data_h indata, pims_ipc_data_h *outdata, void *userdata)
+{
+    unsigned int size = 0;
+    char *str = NULL;
+
+    if (indata)
+    {
+        str = (char*)pims_ipc_data_get(indata, &size);
+        if (!str)
+        {
+            printf("pims_ipc_data_get error\n");
+            return;
+        }
+#if 0
+        str = (char*)pims_ipc_data_get(indata, &size);
+        if (!str)
+        {
+            printf("pims_ipc_data_get error\n");
+            return;
+        }
+        printf("%s\n", str);
+        str = (char*)pims_ipc_data_get(indata, &size);
+        if (!str)
+        {
+            printf("pims_ipc_data_get error\n");
+            return;
+        }
+        printf("%s\n", str);
+#endif
+    }
+
+    if (str && outdata)
+    {
+        *outdata = pims_ipc_data_create(0);
+        if (!*outdata)
+        {
+            printf("pims_ipc_data_create error\n");
+            return;
+        }
+        if (pims_ipc_data_put(*outdata, str, strlen(str) + 1) != 0)
+        {
+            printf("pims_ipc_data_put error\n");
+            pims_ipc_data_destroy(*outdata);
+            *outdata = NULL;
+            return;
+        }
+#if 0
+        if (pims_ipc_data_put(*outdata, "welcome", strlen("welcome") + 1) != 0)
+        {
+            printf("pims_ipc_data_put error\n");
+            pims_ipc_data_destroy(*outdata);
+            *outdata = NULL;
+            return;
+        }
+        if (pims_ipc_data_put(*outdata, "to the jungle", strlen("to the jungle") + 1) != 0)
+        {
+            printf("pims_ipc_data_put error\n");
+            pims_ipc_data_destroy(*outdata);
+            *outdata = NULL;
+            return;
+        }
+#endif
+    }
+}
+
+static gboolean publish_callback(gpointer data)
+{
+    pims_ipc_data_h indata = NULL;
+
+    indata = pims_ipc_data_create(0);
+    if (!indata)
+    {
+        printf("pims_ipc_data_create error\n");
+        return false;
+    }
+    if (pims_ipc_data_put(indata, "publish test", strlen("publish test") + 1) != 0)
+    {
+        printf("pims_ipc_data_put error\n");
+        return false;
+    }
+    if (pims_ipc_svc_publish("test_module", "publish", indata) != 0)
+    {
+        printf("pims_ipc_svc_publish error\n");
+        return false;
+    }
+
+    if (indata)
+        pims_ipc_data_destroy(indata);
+
+    return true;
+}
+
+int server_main()
+{
+
+    pims_ipc_svc_init("pims-ipc-test", getuid(), 0660);
+
+    if (pims_ipc_svc_register("test_module", "test_function", test_function, NULL) != 0)
+    {
+        printf("pims_ipc_svc_register error\n");
+        return -1;
+    }
+
+    if (__is_publish)
+    {
+        pims_ipc_svc_init_for_publish("pims-ipc-pub-test", getuid(), 0660);
+        g_timeout_add_seconds(3, publish_callback, NULL);
+    }
+
+    pims_ipc_svc_run_main_loop(NULL);
+
+    pims_ipc_svc_deinit();
+
+    return 0;
+}
+
+static gboolean call_async_idler_callback(gpointer data);
+
+static void call_async_callback(pims_ipc_h ipc, pims_ipc_data_h data_out, void *userdata)
+{
+    unsigned int size = 0;
+    char *str = NULL;
+
+    printf("(%x) call_async_callback(%p)", (unsigned int)pthread_self(), ipc);
+    if (data_out)
+    {
+        str = (char*)pims_ipc_data_get(data_out, &size);
+        if (!str)
+        {
+            printf("pims_ipc_data_get error\n");
+        }
+        else
+        {
+            printf("pims_ipc_data_get(%s) success\n", str);
+        }
+    }
+
+    sleep(1);
+
+    pims_ipc_data_h indata = NULL;
+    pims_ipc_data_h outdata = NULL;
+
+    indata = pims_ipc_data_create(0);
+    if (!indata)
+    {
+        printf("pims_ipc_data_create error\n");
+        return;
+    }
+    if (pims_ipc_data_put(indata, "hello world", strlen("hello world") + 1) != 0)
+    {
+        printf("pims_ipc_data_put error\n");
+        return;
+    }
+
+    if (pims_ipc_call(ipc, "test_module", "test_function", indata, &outdata) != 0)
+    {
+        printf("pims_ipc_call error\n");
+        return;
+    }
+    
+    if (indata)
+        pims_ipc_data_destroy(indata);
+    if (outdata)
+        pims_ipc_data_destroy(outdata);
+
+    sleep(1);
+
+    call_async_idler_callback(ipc);
+}
+
+static gboolean call_async_idler_callback(gpointer data)
+{
+    pims_ipc_data_h indata = NULL;
+    pims_ipc_data_h outdata = NULL;
+    pims_ipc_h ipc = data;
+    bool ret = false;
+
+    indata = pims_ipc_data_create(0);
+    if (!indata)
+    {
+        printf("pims_ipc_data_create error\n");
+        return FALSE;
+    }
+    if (pims_ipc_data_put(indata, "hello world", strlen("hello world") + 1) != 0)
+    {
+        printf("pims_ipc_data_put error\n");
+        return FALSE;
+    }
+
+    printf("(%x) call_async_idler_callback(%p)\n", (unsigned int)pthread_self(), ipc);
+    ret = pims_ipc_is_call_in_progress(ipc);
+    printf("(%x) before async call : pims_ipc_is_call_in_progress(%p) = %d\n", (unsigned int)pthread_self(), ipc, ret);
+    if (pims_ipc_call_async(ipc, "test_module", "test_function", indata, call_async_callback, NULL) != 0)
+    {
+        printf("pims_ipc_call_async error\n");
+        return FALSE;
+    }
+    ret = pims_ipc_is_call_in_progress(ipc);
+    printf("(%x) after async call : pims_ipc_is_call_in_progress(%p) = %d\n", (unsigned int)pthread_self(), ipc, ret);
+
+    if (pims_ipc_call(ipc, "test_module", "test_function", indata, &outdata) != 0)
+    {
+        printf("pims_ipc_call error during async-call\n");
+        return FALSE;
+    }
+    if (indata)
+        pims_ipc_data_destroy(indata);
+    if (outdata)
+        pims_ipc_data_destroy(outdata);
+
+    return FALSE;
+}
+
+int client_main(pims_ipc_h ipc, int repeat_count, int message_size)
+{
+    pims_ipc_data_h indata = NULL;
+    pims_ipc_data_h outdata = NULL;
+    int retval = -1;
+    unsigned int size = 0;
+    char *str = NULL;
+    int i = 0;
+    struct timeval start_tv = {0, 0};
+    struct timeval end_tv = {0, 0};
+    struct timeval diff_tv = {0, 0};
+    struct timeval prev_sum_tv = {0, 0};
+    struct timeval sum_tv = {0, 0};
+    int count = 0;
+    
+    char *buffer = g_malloc0(message_size + 1);
+
+    for (i = 0; i < message_size; i++)
+        buffer[i] = 'a';
+    buffer[i] = 0;
+
+    for (i = 0; i < repeat_count; i++)
+    {
+        gettimeofday(&start_tv, NULL);
+        indata = pims_ipc_data_create(0);
+        if (!indata)
+        {
+            printf("pims_ipc_data_create error\n");
+            break;
+        }
+        if (pims_ipc_data_put(indata, buffer, strlen(buffer) + 1) != 0)
+        {
+            printf("pims_ipc_data_put error\n");
+            break;
+        }
+#if 0
+        if (pims_ipc_data_put(indata, "hellow", strlen("hellow") + 1) != 0)
+        {
+            printf("pims_ipc_data_put error\n");
+            break;
+        }
+        if (pims_ipc_data_put(indata, "world", strlen("world") + 1) != 0)
+        {
+            printf("pims_ipc_data_put error\n");
+            break;
+        }
+#endif
+        if (pims_ipc_call(ipc, "test_module", "test_function", indata, &outdata) != 0)
+        {
+            printf("pims_ipc_call error\n");
+            break;
+        }
+        pims_ipc_data_destroy(indata);
+        indata = NULL;
+        if (outdata)
+        {
+            str = (char*)pims_ipc_data_get(outdata, &size);
+            if (!str)
+            {
+                printf("pims_ipc_data_get error\n");
+                break;
+            }
+#if 0
+            str = (char*)pims_ipc_data_get(outdata, &size);
+            if (!str)
+            {
+                printf("pims_ipc_data_get error\n");
+                break;
+            }
+            printf("%s\n", str);
+            str = (char*)pims_ipc_data_get(outdata, &size);
+            if (!str)
+            {
+                printf("pims_ipc_data_get error\n");
+                break;
+            }
+            printf("%s\n", str);
+#endif
+
+            pims_ipc_data_destroy(outdata);
+            outdata = NULL;
+        }
+        gettimeofday(&end_tv, NULL);
+        MINUS_TIME(diff_tv, end_tv, start_tv);
+        PLUS_TIME(sum_tv, diff_tv, prev_sum_tv);
+        prev_sum_tv = sum_tv;
+        count++;
+        printf("(%x) start[%lu:%lu] end[%lu:%lu] diff[%lu:%lu]\n",
+                (unsigned int)pthread_self(),
+                start_tv.tv_sec, start_tv.tv_usec,
+                end_tv.tv_sec, end_tv.tv_usec,
+                diff_tv.tv_sec, diff_tv.tv_usec);
+    }
+
+    if (i == repeat_count)
+    {
+        printf("(%x) sum[%lu:%lu] count[%d]\n",
+                (unsigned int)pthread_self(),
+                sum_tv.tv_sec, sum_tv.tv_usec, count);
+        printf("(%x) avg[%lu:%lu]\n",
+                (unsigned int)pthread_self(),
+                sum_tv.tv_sec / count, (sum_tv.tv_sec % count * 1000000 + sum_tv.tv_usec) / count);
+        retval = 0;
+    }
+
+    if (indata)
+        pims_ipc_data_destroy(indata);
+    if (outdata)
+        pims_ipc_data_destroy(outdata);
+
+    if (__is_async)
+        g_idle_add(call_async_idler_callback, ipc);
+
+    return retval;
+}
+
+static void* __worker_task(void *arg)
+{
+    test_arg_t *_arg = arg;
+    printf("(%x) worker %p, %d, %d\n", (unsigned int)pthread_self(), _arg->ipc, _arg->repeat_count, _arg->message_size);
+    client_main(_arg->ipc, _arg->repeat_count, _arg->message_size);
+    printf("worker task has been done\n");
+    return NULL;
+}
+
+static gboolean __launch_worker(gpointer data)
+{
+    test_arg_t *_arg = data;
+    int i = 0;
+    GThread **worker_threads = NULL;
+    gboolean joinable = FALSE;
+
+    worker_threads = g_new0(GThread*, _arg->thread_count);
+    if (!__is_async && !__is_publish)
+        joinable = TRUE;
+
+    for (i = 0; i < _arg->thread_count; i++)
+    {
+        pims_ipc_h ipc = NULL;
+        test_arg_t *arg = g_new0(test_arg_t, 1);
+        arg->repeat_count = _arg->repeat_count;
+        arg->message_size = _arg->message_size;
+        ipc = pims_ipc_create("pims-ipc-test");
+        if (!ipc)
+        {
+            printf("pims_ipc_create error\n");
+            return -1;
+        }
+        arg->ipc = ipc;
+
+        worker_threads[i] = g_thread_create(__worker_task, (void*)arg, joinable, NULL);
+    }
+
+    if (!__is_async && !__is_publish)
+    {
+        for (i = 0; i < _arg->thread_count; i++)
+        {
+            g_thread_join(worker_threads[i]);
+        }
+    }
+
+    g_free(worker_threads);
+
+    return FALSE;
+}
+
+static void subscribe_callback(pims_ipc_h ipc, pims_ipc_data_h data, void *userdata)
+{
+    unsigned int size = 0;
+    char *str = NULL;
+
+    printf("(%x) subscribe_callback(%p)", (unsigned int)pthread_self(), ipc);
+    if (data)
+    {
+        str = (char*)pims_ipc_data_get(data, &size);
+        if (!str)
+        {
+            printf("pims_ipc_data_get error\n");
+        }
+        else
+        {
+            printf("pims_ipc_data_get(%s) success\n", str);
+        }
+    }
+}
+
+static void __print_usage(char *prog)
+{
+    printf("Usage: %s [-r <repeat count> -s <message size> -t <thread count> -a] client|server \n", prog);
+}
+
+int main(int argc, char *argv[])
+{
+    int repeat_count = REPEAT_COUNT;
+    int message_size = BUFFER_SIZE;
+    int thread_count = THREAD_COUNT;
+    int c = 0;
+    test_arg_t arg;
+
+    opterr = 0;
+    optind = 0;
+
+    g_thread_init(NULL);
+
+    while ((c = getopt(argc, argv, "r:s:t:ap")) != -1)
+    {
+        switch (c)
+        {
+            case 'r':
+                repeat_count = atoi(optarg);
+                break;
+            case 's':
+                message_size = atoi(optarg);
+                break;
+            case 't':
+                thread_count = atoi(optarg);
+                break;
+            case 'a':
+                __is_async = TRUE;
+                break;
+            case 'p':
+                __is_publish = TRUE;
+                break;
+            case '?':
+                __print_usage(argv[0]);
+                return -1;
+        }
+            
+    }
+
+    if (argc - optind != 1)
+    {
+        __print_usage(argv[0]);
+        return -1;
+    }
+
+    if (strcmp(argv[optind], "client") == 0)
+    {
+        // async call 
+        GMainLoop* main_loop = g_main_loop_new(NULL, FALSE);
+
+        printf("client mode.. with %d threads\n", thread_count);
+
+        if (__is_publish)
+        {
+            pims_ipc_h ipc = NULL;
+            ipc = pims_ipc_create_for_subscribe("pims-ipc-pub-test");
+            if (!ipc)
+            {
+                printf("pims_ipc_create_for_subscribe error\n");
+                return -1;
+            }
+            pims_ipc_destroy_for_subscribe(ipc);
+            ipc = pims_ipc_create_for_subscribe("pims-ipc-pub-test");
+            if (!ipc)
+            {
+                printf("pims_ipc_create_for_subscribe error\n");
+                return -1;
+            }
+            if (pims_ipc_subscribe(ipc, "test_module", "publish", subscribe_callback, NULL) != 0) 
+            {
+                printf("pims_ipc_subscribe error\n");
+                return -1;
+            }
+            if (pims_ipc_unsubscribe(ipc, "test_module", "publish") != 0) 
+            {
+                printf("pims_ipc_unsubscribe error\n");
+                return -1;
+            }
+            if (pims_ipc_subscribe(ipc, "test_module", "publish", subscribe_callback, NULL) != 0) 
+            {
+                printf("pims_ipc_subscribe error\n");
+                return -1;
+            }
+        }
+
+        if (thread_count <= 1)
+        {
+            pims_ipc_h ipc = NULL;
+            ipc = pims_ipc_create("pims-ipc-test");
+            if (!ipc)
+            {
+                printf("pims_ipc_create error\n");
+                return -1;
+            }
+
+            client_main(ipc, repeat_count, message_size);
+        }
+        else
+        {
+            arg.message_size = message_size; 
+            arg.repeat_count = repeat_count; 
+            arg.thread_count = thread_count; 
+            if (__is_async || __is_publish)
+                g_idle_add(__launch_worker, &arg);
+            else
+                __launch_worker(&arg);
+        }
+
+        if (__is_async || __is_publish)
+            g_main_loop_run(main_loop);
+    }
+    else if (strcmp(argv[optind], "server") == 0)
+    {
+        printf("server mode..\n");
+        server_main();
+    }
+    else
+    {
+        __print_usage(argv[0]);
+        return -1;
+    }
+    return 0;
+}