4 * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the License);
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
27 #include <poll.h> // pollfds
28 #include <sys/un.h> // sockaddr_un
29 #include <sys/ioctl.h> // ioctl
30 #include <sys/socket.h> //socket
31 #include <sys/types.h>
32 #include <sys/epoll.h> // epoll
33 #include <sys/eventfd.h> // eventfd
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-debug.h"
40 #include "pims-ipc-data.h"
41 #include "pims-ipc-data-internal.h"
44 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
45 sequence_no = ++((handle)->call_sequence_no);\
48 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
52 PIMS_IPC_CALL_STATUS_READY = 0,
53 PIMS_IPC_CALL_STATUS_IN_PROGRESS
54 } pims_ipc_call_status_e;
58 PIMS_IPC_MODE_REQ = 0,
64 pims_ipc_subscribe_cb callback;
71 pims_ipc_data_h *handle;
72 }pims_ipc_subscribe_data_s;
79 GIOChannel *async_channel;
80 guint async_source_id;
81 pthread_mutex_t call_status_mutex;
82 pims_ipc_call_status_e call_status;
83 unsigned int call_sequence_no;
84 pims_ipc_call_async_cb call_async_callback;
85 void *call_async_userdata;
86 pims_ipc_data_h dhandle_for_async_idler;
89 int epoll_stop_thread;
91 GHashTable *subscribe_cb_table;
93 pthread_mutex_t data_queue_mutex;
97 static unsigned int ref_cnt;
98 static GList *subscribe_handles;
101 pims_ipc_server_disconnected_cb callback;
103 } pims_ipc_server_disconnected_cb_t;
105 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
106 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
108 static void __sub_data_free(gpointer user_data)
110 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
111 pims_ipc_data_destroy(data->handle);
116 static void __pims_ipc_free_handle(pims_ipc_s *handle)
118 pthread_mutex_lock(&__gmutex);
120 handle->epoll_stop_thread = true;
122 if (handle->fd != -1)
125 if (handle->io_thread)
126 pthread_join(handle->io_thread, NULL);
129 g_free(handle->service);
131 if (handle->async_channel) {
132 // remove a subscriber handle from the golbal list
133 subscribe_handles = g_list_remove(subscribe_handles, handle);
134 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
136 g_source_remove(handle->async_source_id);
137 g_io_channel_unref(handle->async_channel);
140 if (handle->subscribe_cb_table)
141 g_hash_table_destroy(handle->subscribe_cb_table);
143 pthread_mutex_lock(&handle->data_queue_mutex);
144 if (handle->data_queue) {
145 g_list_free_full(handle->data_queue, __sub_data_free);
147 pthread_mutex_unlock(&handle->data_queue_mutex);
148 pthread_mutex_destroy(&handle->data_queue_mutex);
150 if (handle->subscribe_fd != -1)
151 close(handle->subscribe_fd);
153 pthread_mutex_destroy(&handle->call_status_mutex);
157 if (--ref_cnt <= 0) {
158 if (subscribe_handles)
159 g_list_free(subscribe_handles);
160 subscribe_handles = NULL;
163 pthread_mutex_unlock(&__gmutex);
166 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
168 pims_ipc_cb_s *cb_data = NULL;
172 read_command(handle->subscribe_fd, &dummy);
174 pthread_mutex_lock(&handle->data_queue_mutex);
175 if (!handle->data_queue) {
176 pthread_mutex_unlock(&handle->data_queue_mutex);
180 GList *cursor = g_list_first(handle->data_queue);
181 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
183 pthread_mutex_unlock(&handle->data_queue_mutex);
187 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
188 if (cb_data == NULL) {
189 VERBOSE("unable to find %s", call_id);
192 cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
194 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
195 __sub_data_free(data);
196 pthread_mutex_unlock(&handle->data_queue_mutex);
202 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
204 pims_ipc_s *handle = (pims_ipc_s *)data;
208 if (condition & G_IO_HUP)
211 pthread_mutex_lock(&__gmutex);
213 // check if a subscriber handle is exists
214 if (g_list_find(subscribe_handles, handle) == NULL) {
215 ERROR("No such handle that ID is %p", handle);
216 pthread_mutex_unlock(&__gmutex);
220 __pims_ipc_receive_for_subscribe(handle);
222 pthread_mutex_unlock(&__gmutex);
227 static unsigned int __get_global_sequence_no()
229 static unsigned int __gsequence_no = 0xffffffff;
231 if (__gsequence_no == 0xffffffff)
232 __gsequence_no = (unsigned int)time(NULL);
235 return __gsequence_no;
238 static int __pims_ipc_send_identify(pims_ipc_s *handle)
240 unsigned int sequence_no;
241 unsigned int client_id_len = strlen(handle->id);
242 unsigned int len = sizeof(unsigned int) // total size
243 + client_id_len + sizeof(unsigned int) // client_id
244 + sizeof(unsigned int) ; // seq_no
248 memset(buf, 0x0, len+1);
251 memcpy(buf, (void*)&len, sizeof(unsigned int));
252 length += sizeof(unsigned int);
255 memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
256 length += sizeof(unsigned int);
257 memcpy(buf+length, (void*)(handle->id), client_id_len);
258 length += client_id_len;
261 GET_CALL_SEQUNECE_NO(handle, sequence_no);
262 memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
263 length += sizeof(unsigned int);
265 return socket_send(handle->fd, buf, length);
268 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
271 gboolean is_ok = FALSE;
273 pims_ipc_data_h data = NULL;
274 unsigned int sequence_no = 0;
275 char *client_id = NULL;
276 char *call_id = NULL;
279 /* read the size of message. note that ioctl is non-blocking */
280 if (ioctl(handle->fd, FIONREAD, &len)) {
281 ERROR("ioctl failed: %d", errno);
285 /* when server or client closed socket */
287 ERROR("[IPC Socket] connection is closed");
292 unsigned int read_len = 0;
293 unsigned int total_len = 0;
294 unsigned int client_id_len = 0;
295 unsigned int call_id_len = 0;
296 unsigned int is_data = FALSE;
299 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
302 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(client_id_len), sizeof(unsigned int)));
303 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
304 client_id = calloc(1, client_id_len+1);
305 if (client_id == NULL) {
306 ERROR("calloc fail");
312 ret = socket_recv(handle->fd, (void *)&(client_id), client_id_len);
313 if (ret < 0) { ERROR("socket_recv error"); break; }
317 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(sequence_no), sizeof(unsigned int)));
318 if (total_len == read_len) {
320 data = pims_ipc_data_create(0);
322 ERROR("pims_ipc_data_create() Fail");
325 ret = pims_ipc_data_put(data, client_id, client_id_len);
327 WARNING("pims_ipc_data_put fail(%d)", ret);
331 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
332 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
333 call_id = calloc(1, call_id_len+1);
334 if (call_id == NULL) {
335 ERROR("calloc fail");
342 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
343 if (ret < 0) { ERROR("socket_recv error"); break; }
346 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
348 unsigned int data_len;
349 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
350 if (data_len > 0 && data_len < UINT_MAX-1) {
351 buf = calloc(1, data_len+1);
353 ERROR("calloc fail");
359 ret = socket_recv(handle->fd, (void *)&(buf), data_len);
360 if (ret < 0) { ERROR("socket_recv error"); break; }
363 data = pims_ipc_data_steal_unmarshal(buf, data_len);
365 ERROR("pims_ipc_data_steal_unmarshal() Fail");
371 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, sequence_no);
377 if (sequence_no == handle->call_sequence_no) {
378 if (data_out != NULL) {
382 pims_ipc_data_destroy(data);
387 pims_ipc_data_destroy(data);
388 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no);
397 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
400 struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof (struct pollfd));
401 if (NULL == pollfds) {
402 ERROR("calloc() Fail");
406 pollfds[0].fd = handle->fd;
407 pollfds[0].events = POLLIN | POLLERR | POLLHUP;
411 ret = poll(pollfds, 1, 1000);
412 if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) {
419 if (pollfds[0].revents & (POLLERR|POLLHUP)) {
420 ERROR("Server disconnected");
424 if (pollfds[0].revents & POLLIN) {
425 ret = __pims_ipc_read_data(handle, data_out);
434 static int __open_subscribe_fd(pims_ipc_s *handle)
436 // router inproc eventfd
437 int subscribe_fd = eventfd(0,0);
441 if (-1 == subscribe_fd) {
442 ERROR("eventfd error : %d", errno);
445 VERBOSE("subscribe :%d\n", subscribe_fd);
447 flags = fcntl (subscribe_fd, F_GETFL, 0);
450 ret = fcntl (subscribe_fd, F_SETFL, flags | O_NONBLOCK);
451 VERBOSE("subscribe fcntl : %d\n", ret);
453 handle->subscribe_fd = subscribe_fd;
457 static int __subscribe_data(pims_ipc_s * handle)
461 char *call_id = NULL;
463 pims_ipc_data_h dhandle = NULL;
466 /* read the size of message. note that ioctl is non-blocking */
467 if (ioctl(handle->fd, FIONREAD, &len)) {
468 ERROR("ioctl failed: %d", errno);
472 /* when server or client closed socket */
474 INFO("[IPC Socket] connection is closed");
479 unsigned int read_len = 0;
480 unsigned int total_len = 0;
481 unsigned int call_id_len = 0;
482 unsigned int is_data = FALSE;
485 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
488 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
489 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
490 call_id = calloc(1, call_id_len+1);
491 if (call_id == NULL) {
492 ERROR("calloc fail");
499 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
500 if (ret < 0) { ERROR("socket_recv error"); break; }
504 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
507 unsigned int data_len;
508 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
509 if (data_len > 0 && data_len < UINT_MAX-1) {
510 buf = calloc(1, data_len+1);
512 ERROR("calloc fail");
518 ret = socket_recv(handle->fd, (void *)&(buf), data_len);
520 ERROR("socket_recv error");
525 dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
526 if (NULL == dhandle) {
527 ERROR("pims_ipc_data_steal_unmarshal() Fail");
532 pims_ipc_subscribe_data_s *sub_data = (pims_ipc_subscribe_data_s *)calloc(1, sizeof(pims_ipc_subscribe_data_s));
533 if (NULL == sub_data) {
534 ERROR("calloc() Fail");
535 pims_ipc_data_destroy(dhandle);
539 sub_data->handle = dhandle;
540 sub_data->call_id = call_id;
543 pthread_mutex_lock(&handle->data_queue_mutex);
544 handle->data_queue = g_list_append(handle->data_queue, sub_data);
545 pthread_mutex_unlock(&handle->data_queue_mutex);
546 write_command(handle->subscribe_fd, 1);
556 static gboolean __hung_up_cb(gpointer data)
558 pthread_mutex_lock(&__disconnect_cb_mutex);
559 if (_server_disconnected_cb.callback)
560 _server_disconnected_cb.callback(_server_disconnected_cb.user_data);
561 pthread_mutex_unlock(&__disconnect_cb_mutex);
565 static void* __io_thread(void *data)
567 pims_ipc_s *handle = data;
568 struct epoll_event ev = {0};
572 epfd = epoll_create(MAX_EPOLL_EVENT);
574 ev.events = EPOLLIN | EPOLLHUP;
575 ev.data.fd = handle->fd;
577 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
578 WARN_IF(ret != 0, "listen error :%d", ret);
580 while (!handle->epoll_stop_thread) {
582 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
583 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, 50);
585 if (handle->epoll_stop_thread)
588 if (event_num == -1) {
589 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
590 ERROR("errno:%d\n", errno);
595 for (i = 0; i < event_num; i++) {
596 if (events[i].events & EPOLLHUP) {
597 ERROR("server fd closed");
598 g_idle_add(__hung_up_cb, NULL);
599 handle->epoll_stop_thread = true;
603 if (events[i].events & EPOLLIN) {
604 if(__subscribe_data(handle) < 0) {
605 ERROR("server fd closed");
606 g_idle_add(__hung_up_cb, NULL);
618 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
620 pims_ipc_s *handle = NULL;
621 gboolean is_ok = FALSE;
623 pthread_mutex_lock(&__gmutex);
626 struct sockaddr_un server_addr;
630 VERBOSE("Create %d th..", ref_cnt);
632 handle = g_new0(pims_ipc_s, 1);
633 if (handle == NULL) {
634 ERROR("Failed to allocation");
638 handle->subscribe_fd = -1;
639 handle->io_thread = 0;
640 handle->service = g_strdup(service);
641 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
642 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
643 if (handle->fd < 0) {
644 ERROR("socket error : %d, errno: %d", handle->fd, errno);
647 int flags = fcntl (handle->fd, F_GETFL, 0);
650 ret = fcntl (handle->fd, F_SETFL, flags | O_NONBLOCK);
651 VERBOSE("socket fcntl : %d\n", ret);
653 pthread_mutex_init(&handle->call_status_mutex, 0);
655 pthread_mutex_lock(&handle->call_status_mutex);
656 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
657 pthread_mutex_unlock(&handle->call_status_mutex);
659 bzero(&server_addr, sizeof(server_addr));
660 server_addr.sun_family = AF_UNIX;
661 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
663 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
665 ERROR("connect error : %d, errno: %d", ret, errno);
668 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
670 if (mode == PIMS_IPC_MODE_REQ) {
671 handle->call_sequence_no = (unsigned int)time(NULL);
672 ret = __pims_ipc_send_identify(handle);
674 ERROR("__pims_ipc_send_identify error");
677 __pims_ipc_receive(handle, NULL);
679 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) {
680 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
684 handle->epoll_stop_thread = false;
685 pthread_mutex_init(&handle->data_queue_mutex, 0);
687 pthread_mutex_lock(&handle->data_queue_mutex);
688 handle->data_queue = NULL;
689 pthread_mutex_unlock(&handle->data_queue_mutex);
691 ret = __open_subscribe_fd(handle);
696 ret = pthread_create(&worker, NULL, __io_thread, handle);
699 handle->io_thread = worker;
701 GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
702 if (!async_channel) {
703 ERROR("g_io_channel_unix_new error");
706 handle->async_channel = async_channel;
707 handle->async_source_id = g_io_add_watch(handle->async_channel, G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
708 handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
709 ASSERT(handle->subscribe_cb_table);
711 // add a subscriber handle to the global list
712 subscribe_handles = g_list_append(subscribe_handles, handle);
713 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
717 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
720 pthread_mutex_unlock(&__gmutex);
722 if (FALSE == is_ok) {
724 __pims_ipc_free_handle(handle);
732 API pims_ipc_h pims_ipc_create(char *service)
734 return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
737 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
739 return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
742 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
744 pims_ipc_s *handle = (pims_ipc_s *)ipc;
746 if (mode == PIMS_IPC_MODE_REQ) {
747 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0) {
748 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
753 __pims_ipc_free_handle(handle);
756 API void pims_ipc_destroy(pims_ipc_h ipc)
758 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
761 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
763 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
766 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
769 unsigned int sequence_no = 0;
770 gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
771 unsigned int call_id_len = strlen(call_id);
772 pims_ipc_data_s *data = NULL;
773 unsigned int is_data = FALSE;
774 unsigned int client_id_len = strlen(handle->id);
777 GET_CALL_SEQUNECE_NO(handle, sequence_no);
779 int len = sizeof(unsigned int) // total size
780 + client_id_len + sizeof(unsigned int) // client_id
781 + sizeof(unsigned int) // seq_no
782 + call_id_len + sizeof(unsigned int) // call_id
783 + sizeof(unsigned int); // is data
789 data = (pims_ipc_data_s*)data_in;
790 len += sizeof(unsigned int);
791 total_len = len + data->buf_size;
794 INFO("len : %d, client_id : %s, call_id : %s, seq_no :%d", len, handle->id, call_id, sequence_no);
798 memset(buf, 0x0, len+1);
800 memcpy(buf, (void*)&total_len, sizeof(unsigned int));
801 length += sizeof(unsigned int);
804 client_id_len = strlen(handle->id);
805 memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
806 length += sizeof(unsigned int);
807 memcpy(buf+length, (void*)(handle->id), client_id_len);
808 length += client_id_len;
811 memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
812 length += sizeof(unsigned int);
815 memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
816 length += sizeof(unsigned int);
817 memcpy(buf+length, (void*)(call_id), call_id_len);
818 length += call_id_len;
822 memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
823 length += sizeof(unsigned int);
826 memcpy(buf+length, (void*)&(data->buf_size), sizeof(unsigned int));
827 length += sizeof(unsigned int);
829 ret = socket_send(handle->fd, buf, length);
831 ret = socket_send_data(handle->fd, data->buf, data->buf_size);
834 ret = socket_send(handle->fd, buf, length);
843 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
844 pims_ipc_data_h *data_out)
846 pims_ipc_s *handle = (pims_ipc_s *)ipc;
850 ERROR("invalid handle : %p", ipc);
854 if (!module || !function) {
855 ERROR("invalid argument");
859 pthread_mutex_lock(&handle->call_status_mutex);
860 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
861 pthread_mutex_unlock(&handle->call_status_mutex);
862 ERROR("the previous call is in progress : %p", ipc);
865 pthread_mutex_unlock(&handle->call_status_mutex);
868 if (__pims_ipc_send(handle, module, function, data_in) != 0) {
872 if (__pims_ipc_receive(handle, data_out) != 0) {
879 static gboolean __call_async_idler_cb(gpointer data)
883 pims_ipc_s *handle = (pims_ipc_s *)data;
885 ASSERT(handle->dhandle_for_async_idler);
886 pims_ipc_data_h dhandle = handle->dhandle_for_async_idler;
887 handle->dhandle_for_async_idler = NULL;
889 pthread_mutex_lock(&handle->call_status_mutex);
890 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
891 pthread_mutex_unlock(&handle->call_status_mutex);
893 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
894 pims_ipc_data_destroy(dhandle);
899 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data)
901 pims_ipc_s *handle = (pims_ipc_s *)data;
902 pims_ipc_data_h dhandle = NULL;
904 if (__pims_ipc_receive(handle, &dhandle) == 0) {
905 VERBOSE("call status = %d", handle->call_status);
907 pthread_mutex_lock(&handle->call_status_mutex);
908 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
909 pthread_mutex_unlock(&handle->call_status_mutex);
910 pims_ipc_data_destroy(dhandle);
913 pthread_mutex_unlock(&handle->call_status_mutex);
914 if (src == NULL) { // A response is arrived too quickly
915 handle->dhandle_for_async_idler = dhandle;
916 g_idle_add(__call_async_idler_cb, handle);
919 pthread_mutex_lock(&handle->call_status_mutex);
920 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
921 pthread_mutex_unlock(&handle->call_status_mutex);
923 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
924 pims_ipc_data_destroy(dhandle);
931 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
932 pims_ipc_call_async_cb callback, void *userdata)
934 pims_ipc_s *handle = (pims_ipc_s *)ipc;
938 ERROR("invalid handle : %p", ipc);
942 if (!module || !function || !callback) {
943 ERROR("invalid argument");
947 pthread_mutex_lock(&handle->call_status_mutex);
948 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
949 pthread_mutex_unlock(&handle->call_status_mutex);
950 ERROR("the previous call is in progress : %p", ipc);
953 pthread_mutex_unlock(&handle->call_status_mutex);
955 pthread_mutex_lock(&handle->call_status_mutex);
956 handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
957 pthread_mutex_unlock(&handle->call_status_mutex);
959 handle->call_async_callback = callback;
960 handle->call_async_userdata = userdata;
962 // add a callback for GIOChannel
963 if (!handle->async_channel) {
964 handle->async_channel = g_io_channel_unix_new(handle->fd);
965 if (!handle->async_channel) {
966 ERROR("g_io_channel_unix_new error");
971 source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle);
972 handle->async_source_id = source_id;
974 if (__pims_ipc_send(handle, module, function, data_in) != 0) {
975 g_source_remove(source_id);
979 __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
984 API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc)
987 pims_ipc_s *handle = (pims_ipc_s *)ipc;
990 ERROR("invalid handle : %p", ipc);
994 pthread_mutex_lock(&handle->call_status_mutex);
995 if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
999 pthread_mutex_unlock(&handle->call_status_mutex);
1003 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata)
1005 gchar *call_id = NULL;
1006 pims_ipc_cb_s *cb_data = NULL;
1007 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1009 if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1010 ERROR("invalid handle : %p", ipc);
1014 if (!module || !event || !callback) {
1015 ERROR("invalid argument");
1019 cb_data = g_new0(pims_ipc_cb_s, 1);
1020 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1022 VERBOSE("subscribe cb id[%s]", call_id);
1023 cb_data->callback = callback;
1024 cb_data->user_data = userdata;
1025 g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1030 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1032 gchar *call_id = NULL;
1033 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1035 if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1036 ERROR("invalid handle : %p", ipc);
1040 if (!module || !event) {
1041 ERROR("invalid argument");
1045 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1047 VERBOSE("unsubscribe cb id[%s]", call_id);
1049 if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1050 ERROR("g_hash_table_remove error");
1059 API int pims_ipc_unset_server_disconnected_cb()
1061 pthread_mutex_lock(&__disconnect_cb_mutex);
1062 _server_disconnected_cb.callback = NULL;
1063 _server_disconnected_cb.user_data = NULL;
1064 pthread_mutex_unlock(&__disconnect_cb_mutex);
1068 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback, void *user_data)
1070 pthread_mutex_lock(&__disconnect_cb_mutex);
1071 _server_disconnected_cb.callback = callback;
1072 _server_disconnected_cb.user_data = user_data;
1073 pthread_mutex_unlock(&__disconnect_cb_mutex);