4 * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the License);
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
27 #include <poll.h> // pollfds
28 #include <sys/un.h> // sockaddr_un
29 #include <sys/ioctl.h> // ioctl
30 #include <sys/socket.h> //socket
31 #include <sys/types.h>
32 #include <sys/epoll.h> // epoll
33 #include <sys/eventfd.h> // eventfd
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-debug.h"
40 #include "pims-ipc-data.h"
41 #include "pims-ipc-data-internal.h"
44 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
45 sequence_no = ++((handle)->call_sequence_no);\
48 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
52 PIMS_IPC_CALL_STATUS_READY = 0,
53 PIMS_IPC_CALL_STATUS_IN_PROGRESS
54 } pims_ipc_call_status_e;
58 PIMS_IPC_MODE_REQ = 0,
64 pims_ipc_subscribe_cb callback;
71 pims_ipc_data_h *handle;
72 }pims_ipc_subscribe_data_s;
79 GIOChannel *async_channel;
80 guint async_source_id;
81 pthread_mutex_t call_status_mutex;
82 pims_ipc_call_status_e call_status;
83 unsigned int call_sequence_no;
84 pims_ipc_call_async_cb call_async_callback;
85 void *call_async_userdata;
86 pims_ipc_data_h dhandle_for_async_idler;
89 int epoll_stop_thread;
91 GHashTable *subscribe_cb_table;
93 pthread_mutex_t data_queue_mutex;
97 static unsigned int ref_cnt;
98 static GList *subscribe_handles;
101 pims_ipc_server_disconnected_cb callback;
103 } pims_ipc_server_disconnected_cb_t;
105 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
106 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
108 static void __sub_data_free(gpointer user_data)
110 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
111 pims_ipc_data_destroy(data->handle);
116 static void __pims_ipc_free_handle(pims_ipc_s *handle)
118 pthread_mutex_lock(&__gmutex);
120 handle->epoll_stop_thread = true;
122 if (handle->fd != -1)
125 pthread_mutex_unlock(&__gmutex);
126 if (handle->io_thread)
127 pthread_join(handle->io_thread, NULL);
128 pthread_mutex_lock(&__gmutex);
131 g_free(handle->service);
133 if (handle->async_channel) {
134 // remove a subscriber handle from the golbal list
135 subscribe_handles = g_list_remove(subscribe_handles, handle);
136 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
138 g_source_remove(handle->async_source_id);
139 g_io_channel_unref(handle->async_channel);
142 if (handle->subscribe_cb_table)
143 g_hash_table_destroy(handle->subscribe_cb_table);
145 pthread_mutex_lock(&handle->data_queue_mutex);
146 if (handle->data_queue) {
147 g_list_free_full(handle->data_queue, __sub_data_free);
149 pthread_mutex_unlock(&handle->data_queue_mutex);
150 pthread_mutex_destroy(&handle->data_queue_mutex);
152 if (handle->subscribe_fd != -1)
153 close(handle->subscribe_fd);
155 pthread_mutex_destroy(&handle->call_status_mutex);
159 if (--ref_cnt <= 0) {
160 if (subscribe_handles)
161 g_list_free(subscribe_handles);
162 subscribe_handles = NULL;
165 pthread_mutex_unlock(&__gmutex);
168 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
170 pims_ipc_cb_s *cb_data = NULL;
174 read_command(handle->subscribe_fd, &dummy);
176 pthread_mutex_lock(&handle->data_queue_mutex);
177 if (!handle->data_queue) {
178 pthread_mutex_unlock(&handle->data_queue_mutex);
182 GList *cursor = g_list_first(handle->data_queue);
183 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
185 pthread_mutex_unlock(&handle->data_queue_mutex);
189 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
190 if (cb_data == NULL) {
191 VERBOSE("unable to find %s", call_id);
194 cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
196 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
197 __sub_data_free(data);
198 pthread_mutex_unlock(&handle->data_queue_mutex);
204 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
206 pims_ipc_s *handle = (pims_ipc_s *)data;
210 if (condition & G_IO_HUP)
213 pthread_mutex_lock(&__gmutex);
215 // check if a subscriber handle is exists
216 if (g_list_find(subscribe_handles, handle) == NULL) {
217 ERROR("No such handle that ID is %p", handle);
218 pthread_mutex_unlock(&__gmutex);
222 __pims_ipc_receive_for_subscribe(handle);
224 pthread_mutex_unlock(&__gmutex);
229 static unsigned int __get_global_sequence_no()
231 static unsigned int __gsequence_no = 0xffffffff;
233 if (__gsequence_no == 0xffffffff)
234 __gsequence_no = (unsigned int)time(NULL);
237 return __gsequence_no;
240 static int __pims_ipc_send_identify(pims_ipc_s *handle)
242 unsigned int sequence_no;
243 unsigned int client_id_len = strlen(handle->id);
244 unsigned int len = sizeof(unsigned int) // total size
245 + client_id_len + sizeof(unsigned int) // client_id
246 + sizeof(unsigned int) ; // seq_no
250 memset(buf, 0x0, len+1);
253 memcpy(buf, (void*)&len, sizeof(unsigned int));
254 length += sizeof(unsigned int);
257 memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
258 length += sizeof(unsigned int);
259 memcpy(buf+length, (void*)(handle->id), client_id_len);
260 length += client_id_len;
263 GET_CALL_SEQUNECE_NO(handle, sequence_no);
264 memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
265 length += sizeof(unsigned int);
267 return socket_send(handle->fd, buf, length);
270 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
273 gboolean is_ok = FALSE;
275 pims_ipc_data_h data = NULL;
276 unsigned int sequence_no = 0;
277 char *client_id = NULL;
278 char *call_id = NULL;
281 /* read the size of message. note that ioctl is non-blocking */
282 if (ioctl(handle->fd, FIONREAD, &len)) {
283 ERROR("ioctl failed: %d", errno);
287 /* when server or client closed socket */
289 ERROR("[IPC Socket] connection is closed");
294 unsigned int read_len = 0;
295 unsigned int total_len = 0;
296 unsigned int client_id_len = 0;
297 unsigned int call_id_len = 0;
298 unsigned int is_data = FALSE;
301 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
304 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(client_id_len), sizeof(unsigned int)));
305 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
306 client_id = calloc(1, client_id_len+1);
307 if (client_id == NULL) {
308 ERROR("calloc fail");
314 ret = socket_recv(handle->fd, (void *)&(client_id), client_id_len);
315 if (ret < 0) { ERROR("socket_recv error"); break; }
319 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(sequence_no), sizeof(unsigned int)));
320 if (total_len == read_len) {
322 data = pims_ipc_data_create(0);
324 ERROR("pims_ipc_data_create() Fail");
327 ret = pims_ipc_data_put(data, client_id, client_id_len);
329 WARNING("pims_ipc_data_put fail(%d)", ret);
333 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
334 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
335 call_id = calloc(1, call_id_len+1);
336 if (call_id == NULL) {
337 ERROR("calloc fail");
344 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
345 if (ret < 0) { ERROR("socket_recv error"); break; }
348 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
350 unsigned int data_len;
351 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
352 if (data_len > 0 && data_len < UINT_MAX-1) {
353 buf = calloc(1, data_len+1);
355 ERROR("calloc fail");
361 ret = socket_recv(handle->fd, (void *)&(buf), data_len);
362 if (ret < 0) { ERROR("socket_recv error"); break; }
365 data = pims_ipc_data_steal_unmarshal(buf, data_len);
367 ERROR("pims_ipc_data_steal_unmarshal() Fail");
373 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, sequence_no);
379 if (sequence_no == handle->call_sequence_no) {
380 if (data_out != NULL) {
384 pims_ipc_data_destroy(data);
389 pims_ipc_data_destroy(data);
390 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no);
399 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
402 struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof (struct pollfd));
403 if (NULL == pollfds) {
404 ERROR("calloc() Fail");
408 pollfds[0].fd = handle->fd;
409 pollfds[0].events = POLLIN | POLLERR | POLLHUP;
413 ret = poll(pollfds, 1, 1000);
414 if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) {
421 if (pollfds[0].revents & (POLLERR|POLLHUP)) {
422 ERROR("Server disconnected");
426 if (pollfds[0].revents & POLLIN) {
427 ret = __pims_ipc_read_data(handle, data_out);
436 static int __open_subscribe_fd(pims_ipc_s *handle)
438 // router inproc eventfd
439 int subscribe_fd = eventfd(0,0);
443 if (-1 == subscribe_fd) {
444 ERROR("eventfd error : %d", errno);
447 VERBOSE("subscribe :%d\n", subscribe_fd);
449 flags = fcntl (subscribe_fd, F_GETFL, 0);
452 ret = fcntl (subscribe_fd, F_SETFL, flags | O_NONBLOCK);
453 VERBOSE("subscribe fcntl : %d\n", ret);
455 handle->subscribe_fd = subscribe_fd;
459 static int __subscribe_data(pims_ipc_s * handle)
463 char *call_id = NULL;
465 pims_ipc_data_h dhandle = NULL;
468 /* read the size of message. note that ioctl is non-blocking */
469 if (ioctl(handle->fd, FIONREAD, &len)) {
470 ERROR("ioctl failed: %d", errno);
474 /* when server or client closed socket */
476 INFO("[IPC Socket] connection is closed");
481 unsigned int read_len = 0;
482 unsigned int total_len = 0;
483 unsigned int call_id_len = 0;
484 unsigned int is_data = FALSE;
487 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
490 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
491 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
492 call_id = calloc(1, call_id_len+1);
493 if (call_id == NULL) {
494 ERROR("calloc fail");
501 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
502 if (ret < 0) { ERROR("socket_recv error"); break; }
506 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
509 unsigned int data_len;
510 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
511 if (data_len > 0 && data_len < UINT_MAX-1) {
512 buf = calloc(1, data_len+1);
514 ERROR("calloc fail");
520 ret = socket_recv(handle->fd, (void *)&(buf), data_len);
522 ERROR("socket_recv error");
527 dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
528 if (NULL == dhandle) {
529 ERROR("pims_ipc_data_steal_unmarshal() Fail");
534 pims_ipc_subscribe_data_s *sub_data = (pims_ipc_subscribe_data_s *)calloc(1, sizeof(pims_ipc_subscribe_data_s));
535 if (NULL == sub_data) {
536 ERROR("calloc() Fail");
537 pims_ipc_data_destroy(dhandle);
541 sub_data->handle = dhandle;
542 sub_data->call_id = call_id;
545 pthread_mutex_lock(&handle->data_queue_mutex);
546 handle->data_queue = g_list_append(handle->data_queue, sub_data);
547 pthread_mutex_unlock(&handle->data_queue_mutex);
548 write_command(handle->subscribe_fd, 1);
558 static gboolean __hung_up_cb(gpointer data)
560 pthread_mutex_lock(&__disconnect_cb_mutex);
561 if (_server_disconnected_cb.callback)
562 _server_disconnected_cb.callback(_server_disconnected_cb.user_data);
563 pthread_mutex_unlock(&__disconnect_cb_mutex);
567 static void* __io_thread(void *data)
569 pims_ipc_s *handle = data;
570 struct epoll_event ev = {0};
574 epfd = epoll_create(MAX_EPOLL_EVENT);
576 pthread_mutex_lock(&__gmutex);
578 ev.events = EPOLLIN | EPOLLHUP;
579 ev.data.fd = handle->fd;
581 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
582 WARN_IF(ret != 0, "listen error :%d", ret);
583 pthread_mutex_unlock(&__gmutex);
589 pthread_mutex_lock(&__gmutex);
591 if (handle->epoll_stop_thread) {
592 pthread_mutex_unlock(&__gmutex);
595 pthread_mutex_unlock(&__gmutex);
597 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
598 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, 50);
600 pthread_mutex_lock(&__gmutex);
602 if (handle->epoll_stop_thread) {
603 pthread_mutex_unlock(&__gmutex);
606 pthread_mutex_unlock(&__gmutex);
608 if (event_num == -1) {
609 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
610 ERROR("errno:%d\n", errno);
615 pthread_mutex_lock(&__gmutex);
616 for (i = 0; i < event_num; i++) {
617 if (events[i].events & EPOLLHUP) {
618 ERROR("server fd closed");
619 g_idle_add(__hung_up_cb, NULL);
620 handle->epoll_stop_thread = true;
624 if (events[i].events & EPOLLIN) {
625 if(__subscribe_data(handle) < 0) {
626 ERROR("server fd closed");
627 g_idle_add(__hung_up_cb, NULL);
628 handle->epoll_stop_thread = true;
633 pthread_mutex_unlock(&__gmutex);
641 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
643 pims_ipc_s *handle = NULL;
644 gboolean is_ok = FALSE;
646 pthread_mutex_lock(&__gmutex);
649 struct sockaddr_un server_addr;
653 VERBOSE("Create %d th..", ref_cnt);
655 handle = g_new0(pims_ipc_s, 1);
656 if (handle == NULL) {
657 ERROR("Failed to allocation");
661 handle->subscribe_fd = -1;
662 handle->io_thread = 0;
663 handle->service = g_strdup(service);
664 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
665 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
666 if (handle->fd < 0) {
667 ERROR("socket error : %d, errno: %d", handle->fd, errno);
670 int flags = fcntl (handle->fd, F_GETFL, 0);
673 ret = fcntl (handle->fd, F_SETFL, flags | O_NONBLOCK);
674 VERBOSE("socket fcntl : %d\n", ret);
676 pthread_mutex_init(&handle->call_status_mutex, 0);
678 pthread_mutex_lock(&handle->call_status_mutex);
679 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
680 pthread_mutex_unlock(&handle->call_status_mutex);
682 bzero(&server_addr, sizeof(server_addr));
683 server_addr.sun_family = AF_UNIX;
684 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
686 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
688 ERROR("connect error : %d, errno: %d", ret, errno);
691 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
693 if (mode == PIMS_IPC_MODE_REQ) {
694 handle->call_sequence_no = (unsigned int)time(NULL);
695 ret = __pims_ipc_send_identify(handle);
697 ERROR("__pims_ipc_send_identify error");
700 __pims_ipc_receive(handle, NULL);
702 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) {
703 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
707 handle->epoll_stop_thread = false;
708 pthread_mutex_init(&handle->data_queue_mutex, 0);
710 pthread_mutex_lock(&handle->data_queue_mutex);
711 handle->data_queue = NULL;
712 pthread_mutex_unlock(&handle->data_queue_mutex);
714 ret = __open_subscribe_fd(handle);
719 ret = pthread_create(&worker, NULL, __io_thread, handle);
722 handle->io_thread = worker;
724 GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
725 if (!async_channel) {
726 ERROR("g_io_channel_unix_new error");
729 handle->async_channel = async_channel;
730 handle->async_source_id = g_io_add_watch(handle->async_channel, G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
731 handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
732 ASSERT(handle->subscribe_cb_table);
734 // add a subscriber handle to the global list
735 subscribe_handles = g_list_append(subscribe_handles, handle);
736 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
740 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
743 pthread_mutex_unlock(&__gmutex);
745 if (FALSE == is_ok) {
747 __pims_ipc_free_handle(handle);
755 API pims_ipc_h pims_ipc_create(char *service)
757 return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
760 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
762 return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
765 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
767 pims_ipc_s *handle = (pims_ipc_s *)ipc;
769 if (mode == PIMS_IPC_MODE_REQ) {
770 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0) {
771 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
776 __pims_ipc_free_handle(handle);
779 API void pims_ipc_destroy(pims_ipc_h ipc)
781 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
784 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
786 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
789 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
792 unsigned int sequence_no = 0;
793 gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
794 unsigned int call_id_len = strlen(call_id);
795 pims_ipc_data_s *data = NULL;
796 unsigned int is_data = FALSE;
797 unsigned int client_id_len = strlen(handle->id);
800 GET_CALL_SEQUNECE_NO(handle, sequence_no);
802 int len = sizeof(unsigned int) // total size
803 + client_id_len + sizeof(unsigned int) // client_id
804 + sizeof(unsigned int) // seq_no
805 + call_id_len + sizeof(unsigned int) // call_id
806 + sizeof(unsigned int); // is data
812 data = (pims_ipc_data_s*)data_in;
813 len += sizeof(unsigned int);
814 total_len = len + data->buf_size;
817 INFO("len : %d, client_id : %s, call_id : %s, seq_no :%d", len, handle->id, call_id, sequence_no);
821 memset(buf, 0x0, len+1);
823 memcpy(buf, (void*)&total_len, sizeof(unsigned int));
824 length += sizeof(unsigned int);
827 client_id_len = strlen(handle->id);
828 memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
829 length += sizeof(unsigned int);
830 memcpy(buf+length, (void*)(handle->id), client_id_len);
831 length += client_id_len;
834 memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
835 length += sizeof(unsigned int);
838 memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
839 length += sizeof(unsigned int);
840 memcpy(buf+length, (void*)(call_id), call_id_len);
841 length += call_id_len;
845 memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
846 length += sizeof(unsigned int);
849 memcpy(buf+length, (void*)&(data->buf_size), sizeof(unsigned int));
850 length += sizeof(unsigned int);
852 ret = socket_send(handle->fd, buf, length);
854 ret = socket_send_data(handle->fd, data->buf, data->buf_size);
857 ret = socket_send(handle->fd, buf, length);
866 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
867 pims_ipc_data_h *data_out)
869 pims_ipc_s *handle = (pims_ipc_s *)ipc;
873 ERROR("invalid handle : %p", ipc);
877 if (!module || !function) {
878 ERROR("invalid argument");
882 pthread_mutex_lock(&handle->call_status_mutex);
883 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
884 pthread_mutex_unlock(&handle->call_status_mutex);
885 ERROR("the previous call is in progress : %p", ipc);
888 pthread_mutex_unlock(&handle->call_status_mutex);
891 if (__pims_ipc_send(handle, module, function, data_in) != 0) {
895 if (__pims_ipc_receive(handle, data_out) != 0) {
902 static gboolean __call_async_idler_cb(gpointer data)
906 pims_ipc_s *handle = (pims_ipc_s *)data;
908 ASSERT(handle->dhandle_for_async_idler);
909 pims_ipc_data_h dhandle = handle->dhandle_for_async_idler;
910 handle->dhandle_for_async_idler = NULL;
912 pthread_mutex_lock(&handle->call_status_mutex);
913 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
914 pthread_mutex_unlock(&handle->call_status_mutex);
916 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
917 pims_ipc_data_destroy(dhandle);
922 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data)
924 pims_ipc_s *handle = (pims_ipc_s *)data;
925 pims_ipc_data_h dhandle = NULL;
927 if (__pims_ipc_receive(handle, &dhandle) == 0) {
928 VERBOSE("call status = %d", handle->call_status);
930 pthread_mutex_lock(&handle->call_status_mutex);
931 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
932 pthread_mutex_unlock(&handle->call_status_mutex);
933 pims_ipc_data_destroy(dhandle);
936 pthread_mutex_unlock(&handle->call_status_mutex);
937 if (src == NULL) { // A response is arrived too quickly
938 handle->dhandle_for_async_idler = dhandle;
939 g_idle_add(__call_async_idler_cb, handle);
942 pthread_mutex_lock(&handle->call_status_mutex);
943 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
944 pthread_mutex_unlock(&handle->call_status_mutex);
946 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
947 pims_ipc_data_destroy(dhandle);
954 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
955 pims_ipc_call_async_cb callback, void *userdata)
957 pims_ipc_s *handle = (pims_ipc_s *)ipc;
961 ERROR("invalid handle : %p", ipc);
965 if (!module || !function || !callback) {
966 ERROR("invalid argument");
970 pthread_mutex_lock(&handle->call_status_mutex);
971 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
972 pthread_mutex_unlock(&handle->call_status_mutex);
973 ERROR("the previous call is in progress : %p", ipc);
976 pthread_mutex_unlock(&handle->call_status_mutex);
978 pthread_mutex_lock(&handle->call_status_mutex);
979 handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
980 pthread_mutex_unlock(&handle->call_status_mutex);
982 handle->call_async_callback = callback;
983 handle->call_async_userdata = userdata;
985 // add a callback for GIOChannel
986 if (!handle->async_channel) {
987 handle->async_channel = g_io_channel_unix_new(handle->fd);
988 if (!handle->async_channel) {
989 ERROR("g_io_channel_unix_new error");
994 source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle);
995 handle->async_source_id = source_id;
997 if (__pims_ipc_send(handle, module, function, data_in) != 0) {
998 g_source_remove(source_id);
1002 __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
1007 API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc)
1010 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1013 ERROR("invalid handle : %p", ipc);
1017 pthread_mutex_lock(&handle->call_status_mutex);
1018 if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
1022 pthread_mutex_unlock(&handle->call_status_mutex);
1026 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata)
1028 gchar *call_id = NULL;
1029 pims_ipc_cb_s *cb_data = NULL;
1030 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1032 if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1033 ERROR("invalid handle : %p", ipc);
1037 if (!module || !event || !callback) {
1038 ERROR("invalid argument");
1042 cb_data = g_new0(pims_ipc_cb_s, 1);
1043 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1045 VERBOSE("subscribe cb id[%s]", call_id);
1046 cb_data->callback = callback;
1047 cb_data->user_data = userdata;
1048 g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1053 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1055 gchar *call_id = NULL;
1056 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1058 if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1059 ERROR("invalid handle : %p", ipc);
1063 if (!module || !event) {
1064 ERROR("invalid argument");
1068 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1070 VERBOSE("unsubscribe cb id[%s]", call_id);
1072 if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1073 ERROR("g_hash_table_remove error");
1082 API int pims_ipc_unset_server_disconnected_cb()
1084 pthread_mutex_lock(&__disconnect_cb_mutex);
1085 _server_disconnected_cb.callback = NULL;
1086 _server_disconnected_cb.user_data = NULL;
1087 pthread_mutex_unlock(&__disconnect_cb_mutex);
1091 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback, void *user_data)
1093 pthread_mutex_lock(&__disconnect_cb_mutex);
1094 _server_disconnected_cb.callback = callback;
1095 _server_disconnected_cb.user_data = user_data;
1096 pthread_mutex_unlock(&__disconnect_cb_mutex);