4 * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the License);
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
27 #include <poll.h> /* pollfds */
28 #include <sys/un.h> /* sockaddr_un */
29 #include <sys/ioctl.h> /* ioctl */
30 #include <sys/socket.h> /* socket */
31 #include <sys/types.h>
32 #include <sys/epoll.h> /* epoll */
33 #include <sys/eventfd.h> /* eventfd */
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-ipc-data.h"
40 #include "pims-ipc-data-internal.h"
43 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
44 sequence_no = ++((handle)->call_sequence_no);\
47 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
50 PIMS_IPC_CALL_STATUS_READY = 0,
51 PIMS_IPC_CALL_STATUS_IN_PROGRESS
52 } pims_ipc_call_status_e;
55 PIMS_IPC_MODE_REQ = 0,
60 pims_ipc_subscribe_cb callback;
66 pims_ipc_data_h *handle;
67 } pims_ipc_subscribe_data_s;
73 GIOChannel *async_channel;
74 guint disconnected_source;
75 guint async_source_id;
76 pthread_mutex_t call_status_mutex;
77 pims_ipc_call_status_e call_status;
78 unsigned int call_sequence_no;
79 pims_ipc_call_async_cb call_async_callback;
80 void *call_async_userdata;
81 pims_ipc_data_h dhandle_for_async_idler;
84 int epoll_stop_thread;
86 GHashTable *subscribe_cb_table;
88 pthread_mutex_t data_queue_mutex;
92 static unsigned int ref_cnt;
93 static GList *subscribe_handles;
94 static GList *disconnected_list;
97 pims_ipc_server_disconnected_cb callback;
100 } pims_ipc_server_disconnected_cb_t;
102 /* start deprecated */
103 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
105 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
107 static void __sub_data_free(gpointer user_data)
109 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
110 pims_ipc_data_destroy(data->handle);
115 static void __pims_ipc_free_handle(pims_ipc_s *handle)
117 pthread_mutex_lock(&__gmutex);
119 handle->epoll_stop_thread = TRUE;
121 if (handle->fd != -1)
124 pthread_mutex_unlock(&__gmutex);
125 if (handle->io_thread)
126 pthread_join(handle->io_thread, NULL);
127 pthread_mutex_lock(&__gmutex);
130 g_free(handle->service);
132 if (handle->async_channel) {
133 /* remove a subscriber handle from the golbal list */
134 subscribe_handles = g_list_remove(subscribe_handles, handle);
135 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
137 g_source_remove(handle->async_source_id);
138 g_io_channel_unref(handle->async_channel);
141 if (handle->subscribe_cb_table)
142 g_hash_table_destroy(handle->subscribe_cb_table);
144 pthread_mutex_lock(&handle->data_queue_mutex);
145 if (handle->data_queue)
146 g_list_free_full(handle->data_queue, __sub_data_free);
148 pthread_mutex_unlock(&handle->data_queue_mutex);
149 pthread_mutex_destroy(&handle->data_queue_mutex);
151 if (handle->subscribe_fd != -1)
152 close(handle->subscribe_fd);
154 if (0 < handle->disconnected_source)
155 g_source_remove(handle->disconnected_source);
157 pthread_mutex_destroy(&handle->call_status_mutex);
161 if (--ref_cnt <= 0) {
162 if (subscribe_handles)
163 g_list_free(subscribe_handles);
164 subscribe_handles = NULL;
167 pthread_mutex_unlock(&__gmutex);
170 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
172 pims_ipc_cb_s *cb_data = NULL;
176 read_command(handle->subscribe_fd, &dummy);
178 pthread_mutex_lock(&handle->data_queue_mutex);
179 if (!handle->data_queue) {
180 pthread_mutex_unlock(&handle->data_queue_mutex);
184 GList *cursor = g_list_first(handle->data_queue);
185 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
187 pthread_mutex_unlock(&handle->data_queue_mutex);
191 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
193 VERBOSE("unable to find %s", call_id);
195 cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
197 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
198 __sub_data_free(data);
199 pthread_mutex_unlock(&handle->data_queue_mutex);
205 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
207 pims_ipc_s *handle = (pims_ipc_s *)data;
209 if (condition & G_IO_HUP)
212 pthread_mutex_lock(&__gmutex);
214 /* check if a subscriber handle is exists */
215 if (g_list_find(subscribe_handles, handle) == NULL) {
216 ERR("No such handle that ID is %p", handle);
217 pthread_mutex_unlock(&__gmutex);
221 __pims_ipc_receive_for_subscribe(handle);
223 pthread_mutex_unlock(&__gmutex);
228 static unsigned int __get_global_sequence_no()
230 static unsigned int __gsequence_no = 0xffffffff;
232 if (__gsequence_no == 0xffffffff)
233 __gsequence_no = (unsigned int)time(NULL);
236 return __gsequence_no;
239 static int __pims_ipc_send_identify(pims_ipc_s *handle)
241 unsigned int total_len, seq_no;
242 unsigned int client_id_len = strlen(handle->id);
244 total_len = sizeof(total_len) + sizeof(client_id_len)+client_id_len + sizeof(seq_no);
247 char buf[total_len+1];
248 memset(buf, 0x0, total_len+1);
250 memcpy(buf, &total_len, sizeof(total_len));
251 length += sizeof(total_len);
253 memcpy(buf+length, &(client_id_len), sizeof(client_id_len));
254 length += sizeof(client_id_len);
255 memcpy(buf+length, handle->id, client_id_len);
256 length += client_id_len;
258 GET_CALL_SEQUNECE_NO(handle, seq_no);
259 memcpy(buf+length, &(seq_no), sizeof(seq_no));
260 length += sizeof(seq_no);
262 return socket_send(handle->fd, buf, length);
265 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
268 gboolean is_ok = FALSE;
270 pims_ipc_data_h data = NULL;
271 unsigned int seq_no = 0;
272 char *client_id = NULL;
273 char *call_id = NULL;
276 /* read the size of message. note that ioctl is non-blocking */
277 if (ioctl(handle->fd, FIONREAD, &len)) {
278 ERR("ioctl failed: %d", errno);
282 /* when server or client closed socket */
284 ERR("[IPC Socket] connection is closed");
289 unsigned int read_len = 0;
290 unsigned int total_len = 0;
291 unsigned int client_id_len = 0;
292 unsigned int call_id_len = 0;
293 unsigned int has_data = FALSE;
295 read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len)));
297 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &(client_id_len), sizeof(client_id_len)));
298 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
299 client_id = calloc(1, client_id_len+1);
300 if (client_id == NULL) {
306 ret = socket_recv(handle->fd, (void *)&client_id, client_id_len);
307 if (ret < 0) { ERR("socket_recv error"); break; }
310 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &seq_no, sizeof(seq_no)));
311 if (total_len == read_len) {
313 data = pims_ipc_data_create(0);
315 ERR("pims_ipc_data_create() Fail");
318 ret = pims_ipc_data_put(data, client_id, client_id_len);
320 WARN("pims_ipc_data_put fail(%d)", ret);
324 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len)));
325 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
326 call_id = calloc(1, call_id_len+1);
327 if (call_id == NULL) {
334 ret = socket_recv(handle->fd, (void *)&call_id, call_id_len);
335 if (ret < 0) { ERR("socket_recv error"); break; }
338 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data)));
340 unsigned int data_len;
341 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len)));
342 if (data_len > 0 && data_len < UINT_MAX-1) {
343 buf = calloc(1, data_len+1);
350 ret = socket_recv(handle->fd, (void *)&buf, data_len);
351 if (ret < 0) { ERR("socket_recv error"); break; }
354 data = pims_ipc_data_steal_unmarshal(buf, data_len);
356 ERR("pims_ipc_data_steal_unmarshal() Fail");
361 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, seq_no);
367 if (seq_no == handle->call_sequence_no) {
368 if (data_out != NULL)
371 pims_ipc_data_destroy(data);
375 pims_ipc_data_destroy(data);
376 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, seq_no);
385 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
388 struct pollfd pollfds[1];
390 pollfds[0].fd = handle->fd;
391 pollfds[0].events = POLLIN | POLLERR | POLLHUP;
395 ret = poll(pollfds, 1, 1000);
396 if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
403 if (pollfds[0].revents & (POLLERR|POLLHUP)) {
404 ERR("Server disconnected");
408 if (pollfds[0].revents & POLLIN) {
409 ret = __pims_ipc_read_data(handle, data_out);
418 static int __open_subscribe_fd(pims_ipc_s *handle)
423 int subscribe_fd = eventfd(0, 0);
424 if (-1 == subscribe_fd) {
425 ERR("eventfd error : %d", errno);
428 VERBOSE("subscribe :%d\n", subscribe_fd);
430 flags = fcntl(subscribe_fd, F_GETFL, 0);
434 ret = fcntl(subscribe_fd, F_SETFL, flags | O_NONBLOCK);
436 ERR("fcntl() Fail(%d)", errno);
438 handle->subscribe_fd = subscribe_fd;
442 static int __subscribe_data(pims_ipc_s * handle)
446 char *call_id = NULL;
448 pims_ipc_data_h dhandle = NULL;
451 /* read the size of message. note that ioctl is non-blocking */
452 if (ioctl(handle->fd, FIONREAD, &len)) {
453 ERR("ioctl failed: %d", errno);
457 /* when server or client closed socket */
459 INFO("[IPC Socket] connection is closed");
464 unsigned int read_len = 0;
465 unsigned int total_len = 0;
466 unsigned int call_id_len = 0;
467 unsigned int has_data = FALSE;
469 read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len)));
471 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len)));
472 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
473 call_id = calloc(1, call_id_len+1);
474 if (call_id == NULL) {
481 ret = socket_recv(handle->fd, (void *)&call_id, call_id_len);
482 if (ret < 0) { ERR("socket_recv error"); break; }
485 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data)));
488 unsigned int data_len;
489 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len)));
490 if (data_len > 0 && data_len < UINT_MAX-1) {
491 buf = calloc(1, data_len+1);
498 ret = socket_recv(handle->fd, (void *)&buf, data_len);
500 ERR("socket_recv error(%d)", ret);
505 dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
506 if (NULL == dhandle) {
507 ERR("pims_ipc_data_steal_unmarshal() Fail");
511 pims_ipc_subscribe_data_s *sub_data;
512 sub_data = calloc(1, sizeof(pims_ipc_subscribe_data_s));
513 if (NULL == sub_data) {
514 ERR("calloc() Fail");
515 pims_ipc_data_destroy(dhandle);
519 sub_data->handle = dhandle;
520 sub_data->call_id = call_id;
523 pthread_mutex_lock(&handle->data_queue_mutex);
524 handle->data_queue = g_list_append(handle->data_queue, sub_data);
525 pthread_mutex_unlock(&handle->data_queue_mutex);
526 write_command(handle->subscribe_fd, 1);
536 static gboolean __hung_up_cb(gpointer data)
538 GList *cursor = NULL;
540 if (NULL == disconnected_list) {
541 DBG("No disconnected list");
545 pthread_mutex_lock(&__disconnect_cb_mutex);
546 cursor = g_list_first(disconnected_list);
548 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
549 if (disconnected && disconnected->handle == data && disconnected->callback) {
550 DBG("call hung_up callback");
551 disconnected->callback(disconnected->user_data);
554 cursor = g_list_next(cursor);
556 pthread_mutex_unlock(&__disconnect_cb_mutex);
561 static void* __io_thread(void *data)
563 pims_ipc_s *handle = data;
564 struct epoll_event ev = {0};
568 epfd = epoll_create(MAX_EPOLL_EVENT);
570 pthread_mutex_lock(&__gmutex);
572 ev.events = EPOLLIN | EPOLLHUP;
573 ev.data.fd = handle->fd;
575 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
576 WARN_IF(ret != 0, "listen error :%d", ret);
577 pthread_mutex_unlock(&__gmutex);
583 pthread_mutex_lock(&__gmutex);
584 if (handle->epoll_stop_thread) {
585 pthread_mutex_unlock(&__gmutex);
588 pthread_mutex_unlock(&__gmutex);
590 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
591 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, 50);
593 pthread_mutex_lock(&__gmutex);
595 if (handle->epoll_stop_thread) {
596 pthread_mutex_unlock(&__gmutex);
599 pthread_mutex_unlock(&__gmutex);
601 if (event_num == -1) {
602 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
603 ERR("errno:%d\n", errno);
608 pthread_mutex_lock(&__gmutex);
609 for (i = 0; i < event_num; i++) {
610 if (events[i].events & EPOLLHUP) {
611 ERR("server fd closed");
612 handle->epoll_stop_thread = TRUE;
616 if (events[i].events & EPOLLIN) {
617 if (__subscribe_data(handle) < 0) {
618 ERR("server fd closed");
619 g_idle_add(__hung_up_cb, handle);
620 handle->epoll_stop_thread = TRUE;
625 pthread_mutex_unlock(&__gmutex);
633 static gboolean _g_io_hup_cb(GIOChannel *src, GIOCondition condition, gpointer data)
635 if (G_IO_HUP & condition) {
640 } else if (G_IO_IN & condition) {
642 if (0 == recv(((pims_ipc_s *)data)->fd, buf, sizeof(buf), MSG_PEEK)) {
648 ERR("Invalid condition (%d)", condition);
653 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
655 pims_ipc_s *handle = NULL;
656 gboolean is_ok = FALSE;
658 pthread_mutex_lock(&__gmutex);
661 struct sockaddr_un server_addr;
665 VERBOSE("Create %d th..", ref_cnt);
667 handle = g_new0(pims_ipc_s, 1);
668 if (handle == NULL) {
669 ERR("Failed to allocation");
673 handle->subscribe_fd = -1;
674 handle->io_thread = 0;
675 handle->service = g_strdup(service);
676 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
677 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
678 if (handle->fd < 0) {
679 ERR("socket error : %d, errno: %d", handle->fd, errno);
682 int flags = fcntl(handle->fd, F_GETFL, 0);
685 ret = fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK);
687 ERR("fcntl() Fail(%d)", errno);
689 pthread_mutex_init(&handle->call_status_mutex, 0);
691 pthread_mutex_lock(&handle->call_status_mutex);
692 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
693 pthread_mutex_unlock(&handle->call_status_mutex);
695 bzero(&server_addr, sizeof(server_addr));
696 server_addr.sun_family = AF_UNIX;
697 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
699 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
701 ERR("connect error : %d, errno: %d", ret, errno);
704 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
706 if (mode == PIMS_IPC_MODE_REQ) {
707 GIOChannel *ch = g_io_channel_unix_new(handle->fd);
708 handle->disconnected_source = g_io_add_watch(ch, G_IO_IN|G_IO_HUP,
709 _g_io_hup_cb, handle);
710 g_io_channel_unref(ch);
712 handle->call_sequence_no = (unsigned int)time(NULL);
713 ret = __pims_ipc_send_identify(handle);
715 ERR("__pims_ipc_send_identify error");
718 __pims_ipc_receive(handle, NULL);
720 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0)
721 WARN("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
724 handle->epoll_stop_thread = FALSE;
725 pthread_mutex_init(&handle->data_queue_mutex, 0);
727 pthread_mutex_lock(&handle->data_queue_mutex);
728 handle->data_queue = NULL;
729 pthread_mutex_unlock(&handle->data_queue_mutex);
731 ret = __open_subscribe_fd(handle);
736 ret = pthread_create(&worker, NULL, __io_thread, handle);
739 handle->io_thread = worker;
741 GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
742 if (!async_channel) {
743 ERR("g_io_channel_unix_new error");
746 handle->async_channel = async_channel;
747 handle->async_source_id = g_io_add_watch(handle->async_channel,
748 G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
749 handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal,
751 ASSERT(handle->subscribe_cb_table);
753 /* add a subscriber handle to the global list */
754 subscribe_handles = g_list_append(subscribe_handles, handle);
755 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
759 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
762 pthread_mutex_unlock(&__gmutex);
764 if (FALSE == is_ok) {
766 __pims_ipc_free_handle(handle);
774 API pims_ipc_h pims_ipc_create(char *service)
776 return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
779 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
781 return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
784 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
786 pims_ipc_s *handle = ipc;
788 if (mode == PIMS_IPC_MODE_REQ) {
789 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY,
791 WARN("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
796 __pims_ipc_free_handle(handle);
799 API void pims_ipc_destroy(pims_ipc_h ipc)
801 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
804 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
806 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
809 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
813 unsigned int total_len;
814 unsigned int seq_no = 0;
815 gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
816 unsigned int call_id_len = strlen(call_id);
817 pims_ipc_data_s *data = NULL;
818 unsigned int has_data = FALSE;
819 unsigned int client_id_len = strlen(handle->id);
821 GET_CALL_SEQUNECE_NO(handle, seq_no);
823 int len = sizeof(total_len) + sizeof(client_id_len) + client_id_len + sizeof(seq_no)
824 + call_id_len + sizeof(call_id_len) + sizeof(has_data);
830 len += sizeof(unsigned int);
831 total_len = len + data->buf_size;
834 INFO("len(%d),client_id(%s),call_id(%s),seq_no(%d)", len, handle->id, call_id, seq_no);
837 memset(buf, 0x0, len+1);
839 memcpy(buf, &total_len, sizeof(total_len));
840 length += sizeof(total_len);
842 client_id_len = strlen(handle->id);
843 memcpy(buf+length, &client_id_len, sizeof(client_id_len));
844 length += sizeof(client_id_len);
845 memcpy(buf+length, handle->id, client_id_len);
846 length += client_id_len;
848 memcpy(buf+length, &seq_no, sizeof(seq_no));
849 length += sizeof(seq_no);
851 memcpy(buf+length, &call_id_len, sizeof(call_id_len));
852 length += sizeof(call_id_len);
853 memcpy(buf+length, call_id, call_id_len);
854 length += call_id_len;
857 memcpy(buf+length, &has_data, sizeof(has_data));
858 length += sizeof(has_data);
861 memcpy(buf+length, &(data->buf_size), sizeof(data->buf_size));
862 length += sizeof(data->buf_size);
864 ret = socket_send(handle->fd, buf, length);
866 ret = socket_send_data(handle->fd, data->buf, data->buf_size);
868 ret = socket_send(handle->fd, buf, length);
877 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
878 pims_ipc_data_h *data_out)
880 pims_ipc_s *handle = ipc;
882 RETV_IF(NULL == ipc, -1);
883 RETV_IF(NULL == module, -1);
884 RETV_IF(NULL == function, -1);
886 pthread_mutex_lock(&handle->call_status_mutex);
887 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
888 pthread_mutex_unlock(&handle->call_status_mutex);
889 ERR("the previous call is in progress : %p", ipc);
892 pthread_mutex_unlock(&handle->call_status_mutex);
894 if (__pims_ipc_send(handle, module, function, data_in) != 0)
897 if (__pims_ipc_receive(handle, data_out) != 0)
903 static gboolean __call_async_idler_cb(gpointer data)
905 pims_ipc_s *handle = data;
906 pims_ipc_data_h dhandle;
908 RETV_IF(NULL == handle, FALSE);
910 dhandle = handle->dhandle_for_async_idler;
911 handle->dhandle_for_async_idler = NULL;
913 pthread_mutex_lock(&handle->call_status_mutex);
914 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
915 pthread_mutex_unlock(&handle->call_status_mutex);
917 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
918 pims_ipc_data_destroy(dhandle);
923 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition,
926 pims_ipc_s *handle = data;
927 pims_ipc_data_h dhandle = NULL;
929 if (__pims_ipc_receive(handle, &dhandle) == 0) {
930 VERBOSE("call status = %d", handle->call_status);
932 pthread_mutex_lock(&handle->call_status_mutex);
933 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
934 pthread_mutex_unlock(&handle->call_status_mutex);
935 pims_ipc_data_destroy(dhandle);
937 pthread_mutex_unlock(&handle->call_status_mutex);
938 if (src == NULL) { /* A response is arrived too quickly */
939 handle->dhandle_for_async_idler = dhandle;
940 g_idle_add(__call_async_idler_cb, handle);
942 pthread_mutex_lock(&handle->call_status_mutex);
943 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
944 pthread_mutex_unlock(&handle->call_status_mutex);
946 handle->call_async_callback((pims_ipc_h)handle, dhandle,
947 handle->call_async_userdata);
948 pims_ipc_data_destroy(dhandle);
955 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function,
956 pims_ipc_data_h data_in, pims_ipc_call_async_cb callback, void *user_data)
958 pims_ipc_s *handle = ipc;
961 RETV_IF(NULL == ipc, -1);
962 RETV_IF(NULL == module, -1);
963 RETV_IF(NULL == function, -1);
964 RETV_IF(NULL == callback, -1);
966 pthread_mutex_lock(&handle->call_status_mutex);
967 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
968 pthread_mutex_unlock(&handle->call_status_mutex);
969 ERR("the previous call is in progress : %p", ipc);
972 pthread_mutex_unlock(&handle->call_status_mutex);
974 pthread_mutex_lock(&handle->call_status_mutex);
975 handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
976 pthread_mutex_unlock(&handle->call_status_mutex);
978 handle->call_async_callback = callback;
979 handle->call_async_userdata = user_data;
981 /* add a callback for GIOChannel */
982 if (!handle->async_channel) {
983 handle->async_channel = g_io_channel_unix_new(handle->fd);
984 if (!handle->async_channel) {
985 ERR("g_io_channel_unix_new error");
990 source_id = g_io_add_watch(handle->async_channel, G_IO_IN,
991 __pims_ipc_call_async_handler, handle);
992 handle->async_source_id = source_id;
994 if (__pims_ipc_send(handle, module, function, data_in) != 0) {
995 g_source_remove(source_id);
999 __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
1004 API int pims_ipc_is_call_in_progress(pims_ipc_h ipc)
1007 pims_ipc_s *handle = ipc;
1009 RETV_IF(NULL == ipc, FALSE);
1011 pthread_mutex_lock(&handle->call_status_mutex);
1012 if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
1016 pthread_mutex_unlock(&handle->call_status_mutex);
1020 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event,
1021 pims_ipc_subscribe_cb callback, void *user_data)
1023 gchar *call_id = NULL;
1024 pims_ipc_s *handle = ipc;
1025 pims_ipc_cb_s *cb_data = NULL;
1027 RETV_IF(NULL == ipc, -1);
1028 RETV_IF(NULL == module, -1);
1029 RETV_IF(NULL == event, -1);
1030 RETV_IF(NULL == callback, -1);
1031 RETV_IF(NULL == handle->subscribe_cb_table, -1);
1033 cb_data = g_new0(pims_ipc_cb_s, 1);
1034 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1036 VERBOSE("subscribe cb id[%s]", call_id);
1037 cb_data->callback = callback;
1038 cb_data->user_data = user_data;
1039 g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1044 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1046 gchar *call_id = NULL;
1047 pims_ipc_s *handle = ipc;
1049 RETV_IF(NULL == ipc, -1);
1050 RETV_IF(NULL == module, -1);
1051 RETV_IF(NULL == event, -1);
1052 RETV_IF(NULL == handle->subscribe_cb_table, -1);
1054 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1056 VERBOSE("unsubscribe cb id[%s]", call_id);
1058 if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1059 ERR("g_hash_table_remove error");
1068 API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle,
1069 pims_ipc_server_disconnected_cb callback, void *user_data)
1071 GList *cursor = NULL;
1073 /* check already existed */
1074 pthread_mutex_lock(&__disconnect_cb_mutex);
1075 cursor = g_list_first(disconnected_list);
1077 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1078 if (disconnected && disconnected->handle == handle) {
1079 ERR("Already set callback");
1080 pthread_mutex_unlock(&__disconnect_cb_mutex);
1083 cursor = g_list_next(cursor);
1085 pthread_mutex_unlock(&__disconnect_cb_mutex);
1087 /* append callback */
1088 pims_ipc_server_disconnected_cb_t *disconnected = NULL;
1089 disconnected = calloc(1, sizeof(pims_ipc_server_disconnected_cb_t));
1090 if (NULL == disconnected) {
1091 ERR("calloc() Fail");
1094 DBG("add disconnected");
1095 disconnected->handle = handle;
1096 disconnected->callback = callback;
1097 disconnected->user_data = user_data;
1099 pthread_mutex_lock(&__disconnect_cb_mutex);
1100 disconnected_list = g_list_append(disconnected_list, disconnected);
1101 pthread_mutex_unlock(&__disconnect_cb_mutex);
1106 API int pims_ipc_remove_server_disconnected_cb(pims_ipc_h handle)
1108 pthread_mutex_lock(&__disconnect_cb_mutex);
1110 GList *cursor = NULL;
1111 cursor = g_list_first(disconnected_list);
1113 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1114 if (disconnected && disconnected->handle == handle) {
1116 disconnected_list = g_list_delete_link(disconnected_list, cursor);
1117 DBG("remove disconnected_cb");
1120 cursor = g_list_next(cursor);
1122 pthread_mutex_unlock(&__disconnect_cb_mutex);
1127 /* start deprecated */
1128 API int pims_ipc_unset_server_disconnected_cb()
1130 pthread_mutex_lock(&__disconnect_cb_mutex);
1131 _server_disconnected_cb.callback = NULL;
1132 _server_disconnected_cb.user_data = NULL;
1133 pthread_mutex_unlock(&__disconnect_cb_mutex);
1137 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback,
1140 pthread_mutex_lock(&__disconnect_cb_mutex);
1141 _server_disconnected_cb.callback = callback;
1142 _server_disconnected_cb.user_data = user_data;
1143 pthread_mutex_unlock(&__disconnect_cb_mutex);
1146 /* end deprecated */