4 * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the License);
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
27 #include <poll.h> /* pollfds */
28 #include <sys/un.h> /* sockaddr_un */
29 #include <sys/ioctl.h> /* ioctl */
30 #include <sys/socket.h> /* socket */
31 #include <sys/types.h>
32 #include <sys/epoll.h> /* epoll */
33 #include <sys/eventfd.h> /* eventfd */
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-ipc-data.h"
40 #include "pims-ipc-data-internal.h"
43 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
44 sequence_no = ++((handle)->call_sequence_no);\
47 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
50 PIMS_IPC_CALL_STATUS_READY = 0,
51 PIMS_IPC_CALL_STATUS_IN_PROGRESS
52 } pims_ipc_call_status_e;
55 PIMS_IPC_MODE_REQ = 0,
60 pims_ipc_subscribe_cb callback;
66 pims_ipc_data_h *handle;
67 } pims_ipc_subscribe_data_s;
73 GIOChannel *async_channel;
74 guint disconnected_source;
75 guint async_source_id;
76 pthread_mutex_t call_status_mutex;
77 pims_ipc_call_status_e call_status;
78 unsigned int call_sequence_no;
79 pims_ipc_call_async_cb call_async_callback;
80 void *call_async_userdata;
81 pims_ipc_data_h dhandle_for_async_idler;
84 int epoll_stop_thread;
86 GHashTable *subscribe_cb_table;
88 pthread_mutex_t data_queue_mutex;
91 pthread_mutex_t call_mutex; /* not to be interrupted while sending and receiving */
95 static unsigned int ref_cnt;
96 static GList *subscribe_handles;
97 static GList *disconnected_list;
100 pims_ipc_server_disconnected_cb callback;
103 } pims_ipc_server_disconnected_cb_t;
105 /* start deprecated */
106 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
108 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
110 static void __sub_data_free(gpointer user_data)
112 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
113 pims_ipc_data_destroy(data->handle);
118 static void __pims_ipc_free_handle(pims_ipc_s *handle)
120 pthread_mutex_lock(&__gmutex);
122 handle->epoll_stop_thread = TRUE;
123 if (handle->eventfd != -1)
124 if (eventfd_write(handle->eventfd, 1) != 0)
125 VERBOSE("Failed to write the event \n ");
127 if (handle->fd != -1)
130 pthread_mutex_unlock(&__gmutex);
131 if (handle->io_thread)
132 pthread_join(handle->io_thread, NULL);
133 pthread_mutex_lock(&__gmutex);
136 g_free(handle->service);
138 if (handle->async_channel) {
139 /* remove a subscriber handle from the golbal list */
140 subscribe_handles = g_list_remove(subscribe_handles, handle);
141 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
143 g_source_remove(handle->async_source_id);
144 g_io_channel_unref(handle->async_channel);
147 if (handle->subscribe_cb_table)
148 g_hash_table_destroy(handle->subscribe_cb_table);
150 pthread_mutex_lock(&handle->data_queue_mutex);
151 if (handle->data_queue)
152 g_list_free_full(handle->data_queue, __sub_data_free);
154 pthread_mutex_unlock(&handle->data_queue_mutex);
155 pthread_mutex_destroy(&handle->data_queue_mutex);
157 if (handle->subscribe_fd != -1)
158 close(handle->subscribe_fd);
160 if (0 < handle->disconnected_source)
161 g_source_remove(handle->disconnected_source);
163 pthread_mutex_destroy(&handle->call_mutex);
164 pthread_mutex_destroy(&handle->call_status_mutex);
168 if (--ref_cnt <= 0) {
169 if (subscribe_handles)
170 g_list_free(subscribe_handles);
171 subscribe_handles = NULL;
174 pthread_mutex_unlock(&__gmutex);
177 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
179 pims_ipc_cb_s *cb_data = NULL;
183 read_command(handle->subscribe_fd, &dummy);
185 pthread_mutex_lock(&handle->data_queue_mutex);
186 if (!handle->data_queue) {
187 pthread_mutex_unlock(&handle->data_queue_mutex);
191 GList *cursor = g_list_first(handle->data_queue);
192 if (NULL == cursor || NULL == cursor->data) {
193 pthread_mutex_unlock(&handle->data_queue_mutex);
196 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
198 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
200 VERBOSE("unable to find %s", data->call_id);
202 cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
204 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
205 __sub_data_free(data);
206 pthread_mutex_unlock(&handle->data_queue_mutex);
212 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
214 pims_ipc_s *handle = (pims_ipc_s *)data;
216 if (condition & G_IO_HUP)
219 pthread_mutex_lock(&__gmutex);
221 /* check if a subscriber handle is exists */
222 if (g_list_find(subscribe_handles, handle) == NULL) {
223 ERR("No such handle that ID is %p", handle);
224 pthread_mutex_unlock(&__gmutex);
228 __pims_ipc_receive_for_subscribe(handle);
230 pthread_mutex_unlock(&__gmutex);
235 static unsigned int __get_global_sequence_no()
237 static unsigned int __gsequence_no = 0xffffffff;
239 if (__gsequence_no == 0xffffffff)
240 __gsequence_no = (unsigned int)time(NULL);
243 return __gsequence_no;
246 static int __pims_ipc_send_identify(pims_ipc_s *handle)
248 unsigned int total_len, seq_no;
249 unsigned int client_id_len = strlen(handle->id);
251 total_len = sizeof(total_len) + sizeof(client_id_len)+client_id_len + sizeof(seq_no);
254 char buf[total_len+1];
255 memset(buf, 0x0, total_len+1);
257 memcpy(buf, &total_len, sizeof(total_len));
258 length += sizeof(total_len);
260 memcpy(buf+length, &(client_id_len), sizeof(client_id_len));
261 length += sizeof(client_id_len);
262 memcpy(buf+length, handle->id, client_id_len);
263 length += client_id_len;
265 GET_CALL_SEQUNECE_NO(handle, seq_no);
266 memcpy(buf+length, &(seq_no), sizeof(seq_no));
267 length += sizeof(seq_no);
269 return socket_send(handle->fd, buf, length);
272 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
275 gboolean is_ok = FALSE;
277 pims_ipc_data_h data = NULL;
278 unsigned int seq_no = 0;
279 char *client_id = NULL;
280 char *call_id = NULL;
283 /* read the size of message. note that ioctl is non-blocking */
284 if (ioctl(handle->fd, FIONREAD, &len)) {
285 ERR("ioctl failed: %d", errno);
289 /* when server or client closed socket */
291 ERR("[IPC Socket] connection is closed");
296 unsigned int read_len = 0;
297 unsigned int total_len = 0;
298 unsigned int client_id_len = 0;
299 unsigned int call_id_len = 0;
300 unsigned int has_data = FALSE;
302 read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len)));
304 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &(client_id_len), sizeof(client_id_len)));
305 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
306 client_id = calloc(1, client_id_len+1);
307 if (client_id == NULL) {
313 ret = socket_recv(handle->fd, (void *)&client_id, client_id_len);
314 if (ret < 0) { ERR("socket_recv error"); break; }
317 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &seq_no, sizeof(seq_no)));
318 if (total_len == read_len) {
320 data = pims_ipc_data_create(0);
322 ERR("pims_ipc_data_create() Fail");
325 ret = pims_ipc_data_put(data, client_id, client_id_len);
327 WARN("pims_ipc_data_put fail(%d)", ret);
331 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len)));
332 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
333 call_id = calloc(1, call_id_len+1);
334 if (call_id == NULL) {
341 ret = socket_recv(handle->fd, (void *)&call_id, call_id_len);
342 if (ret < 0) { ERR("socket_recv error"); break; }
345 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data)));
347 unsigned int data_len;
348 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len)));
349 if (data_len > 0 && data_len < UINT_MAX-1) {
350 buf = calloc(1, data_len+1);
357 ret = socket_recv(handle->fd, (void *)&buf, data_len);
358 if (ret < 0) { ERR("socket_recv error"); break; }
361 data = pims_ipc_data_steal_unmarshal(buf, data_len);
363 ERR("pims_ipc_data_steal_unmarshal() Fail");
368 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, seq_no);
374 if (seq_no == handle->call_sequence_no) {
375 if (data_out != NULL)
378 pims_ipc_data_destroy(data);
382 pims_ipc_data_destroy(data);
383 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, seq_no);
392 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
395 struct pollfd pollfds[1];
397 pollfds[0].fd = handle->fd;
398 pollfds[0].events = POLLIN | POLLERR | POLLHUP;
399 pollfds[0].revents = 0;
403 ret = poll(pollfds, 1, 4500);
404 if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
411 ERR("timedout errno[%d], revents[%d]", errno, pollfds[0].revents);
415 if (pollfds[0].revents & (POLLERR|POLLHUP)) {
416 ERR("Server disconnected ret[%d]", ret);
420 if (pollfds[0].revents & POLLIN) {
421 ret = __pims_ipc_read_data(handle, data_out);
430 static int __open_subscribe_fd(pims_ipc_s *handle)
435 int subscribe_fd = eventfd(0, 0);
436 if (-1 == subscribe_fd) {
437 ERR("eventfd error : %d", errno);
440 VERBOSE("subscribe :%d\n", subscribe_fd);
442 flags = fcntl(subscribe_fd, F_GETFL, 0);
446 ret = fcntl(subscribe_fd, F_SETFL, flags | O_NONBLOCK);
448 ERR("fcntl() Fail(%d)", errno);
450 handle->subscribe_fd = subscribe_fd;
454 static int __subscribe_data(pims_ipc_s * handle)
458 char *call_id = NULL;
460 pims_ipc_data_h dhandle = NULL;
463 /* read the size of message. note that ioctl is non-blocking */
464 if (ioctl(handle->fd, FIONREAD, &len)) {
465 ERR("ioctl failed: %d", errno);
469 /* when server or client closed socket */
471 INFO("[IPC Socket] connection is closed");
476 unsigned int read_len = 0;
477 unsigned int total_len = 0;
478 unsigned int call_id_len = 0;
479 unsigned int has_data = FALSE;
481 read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len)));
483 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len)));
484 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
485 call_id = calloc(1, call_id_len+1);
486 if (call_id == NULL) {
493 ret = socket_recv(handle->fd, (void *)&call_id, call_id_len);
494 if (ret < 0) { ERR("socket_recv error"); break; }
497 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data)));
500 unsigned int data_len;
501 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len)));
502 if (data_len > 0 && data_len < UINT_MAX-1) {
503 buf = calloc(1, data_len+1);
510 ret = socket_recv(handle->fd, (void *)&buf, data_len);
512 ERR("socket_recv error(%d)", ret);
517 dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
518 if (NULL == dhandle) {
519 ERR("pims_ipc_data_steal_unmarshal() Fail");
523 pims_ipc_subscribe_data_s *sub_data;
524 sub_data = calloc(1, sizeof(pims_ipc_subscribe_data_s));
525 if (NULL == sub_data) {
526 ERR("calloc() Fail");
527 pims_ipc_data_destroy(dhandle);
531 sub_data->handle = dhandle;
532 sub_data->call_id = call_id;
535 pthread_mutex_lock(&handle->data_queue_mutex);
536 handle->data_queue = g_list_append(handle->data_queue, sub_data);
537 pthread_mutex_unlock(&handle->data_queue_mutex);
538 write_command(handle->subscribe_fd, 1);
548 static gboolean __hung_up_cb(gpointer data)
550 GList *cursor = NULL;
552 if (NULL == disconnected_list) {
553 DBG("No disconnected list");
557 pthread_mutex_lock(&__disconnect_cb_mutex);
558 cursor = g_list_first(disconnected_list);
560 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
561 if (disconnected && disconnected->handle == data && disconnected->callback) {
562 DBG("call hung_up callback");
563 disconnected->callback(disconnected->user_data);
566 cursor = g_list_next(cursor);
568 pthread_mutex_unlock(&__disconnect_cb_mutex);
573 static void* __io_thread(void *data)
575 pims_ipc_s *handle = data;
576 struct epoll_event ev = {0};
580 epfd = epoll_create(MAX_EPOLL_EVENT);
582 pthread_mutex_lock(&__gmutex);
584 handle->eventfd = eventfd(0, EFD_NONBLOCK);
585 ev.data.fd = handle->eventfd;
587 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->eventfd, &ev);
588 WARN_IF(ret != 0, "listen error :%d", ret);
590 ev.events = EPOLLIN | EPOLLHUP;
591 ev.data.fd = handle->fd;
592 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
593 WARN_IF(ret != 0, "listen error :%d", ret);
594 pthread_mutex_unlock(&__gmutex);
600 pthread_mutex_lock(&__gmutex);
601 if (handle->epoll_stop_thread) {
602 pthread_mutex_unlock(&__gmutex);
605 pthread_mutex_unlock(&__gmutex);
607 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
608 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, -1);
610 pthread_mutex_lock(&__gmutex);
612 if (handle->epoll_stop_thread) {
613 DBG("__io_thread epoll_stop_thread");
614 pthread_mutex_unlock(&__gmutex);
617 pthread_mutex_unlock(&__gmutex);
619 if (event_num == -1) {
620 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
621 ERR("errno:%d\n", errno);
626 pthread_mutex_lock(&__gmutex);
627 for (i = 0; i < event_num; i++) {
628 if (events[i].events & EPOLLHUP) {
629 ERR("server fd closed");
630 handle->epoll_stop_thread = TRUE;
634 if (events[i].events & EPOLLIN) {
635 if (__subscribe_data(handle) < 0) {
636 ERR("server fd closed");
637 g_idle_add(__hung_up_cb, handle);
638 handle->epoll_stop_thread = TRUE;
643 pthread_mutex_unlock(&__gmutex);
647 close(handle->eventfd);
652 static gboolean _g_io_hup_cb(GIOChannel *src, GIOCondition condition, gpointer data)
654 if (G_IO_HUP & condition) {
659 } else if (G_IO_IN & condition) {
661 if (0 == recv(((pims_ipc_s *)data)->fd, buf, sizeof(buf), MSG_PEEK)) {
667 ERR("Invalid condition (%d)", condition);
672 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
674 pims_ipc_s *handle = NULL;
675 gboolean is_ok = FALSE;
677 pthread_mutex_lock(&__gmutex);
680 struct sockaddr_un server_addr;
684 VERBOSE("Create %d th..", ref_cnt);
686 handle = g_new0(pims_ipc_s, 1);
687 if (handle == NULL) {
688 ERR("Failed to allocation");
692 handle->subscribe_fd = -1;
693 handle->io_thread = 0;
694 handle->eventfd = -1;
695 handle->service = g_strdup(service);
696 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
697 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
698 if (handle->fd < 0) {
699 ERR("socket error : %d, errno: %d", handle->fd, errno);
702 int flags = fcntl(handle->fd, F_GETFL, 0);
705 ret = fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK);
707 ERR("fcntl() Fail(%d)", errno);
709 pthread_mutex_init(&handle->call_status_mutex, 0);
711 pthread_mutex_lock(&handle->call_status_mutex);
712 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
713 pthread_mutex_unlock(&handle->call_status_mutex);
715 pthread_mutex_init(&handle->call_mutex, 0);
717 bzero(&server_addr, sizeof(server_addr));
718 server_addr.sun_family = AF_UNIX;
719 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
721 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
723 ERR("connect error : %d, errno: %d", ret, errno);
726 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
727 DBG("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
729 if (mode == PIMS_IPC_MODE_REQ) {
730 GIOChannel *ch = g_io_channel_unix_new(handle->fd);
731 handle->disconnected_source = g_io_add_watch(ch, G_IO_IN|G_IO_HUP,
732 _g_io_hup_cb, handle);
733 g_io_channel_unref(ch);
735 handle->call_sequence_no = (unsigned int)time(NULL);
736 ret = __pims_ipc_send_identify(handle);
738 ERR("__pims_ipc_send_identify error");
741 ret = __pims_ipc_receive(handle, NULL);
743 ERR("__pims_ipc_receive error");
747 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0)
748 WARN("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
751 handle->epoll_stop_thread = FALSE;
752 pthread_mutex_init(&handle->data_queue_mutex, 0);
754 pthread_mutex_lock(&handle->data_queue_mutex);
755 handle->data_queue = NULL;
756 pthread_mutex_unlock(&handle->data_queue_mutex);
758 ret = __open_subscribe_fd(handle);
763 ret = pthread_create(&worker, NULL, __io_thread, handle);
766 handle->io_thread = worker;
768 GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
769 if (!async_channel) {
770 ERR("g_io_channel_unix_new error");
773 handle->async_channel = async_channel;
774 handle->async_source_id = g_io_add_watch(handle->async_channel,
775 G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
776 handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal,
778 ASSERT(handle->subscribe_cb_table);
780 /* add a subscriber handle to the global list */
781 subscribe_handles = g_list_append(subscribe_handles, handle);
782 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
786 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
789 pthread_mutex_unlock(&__gmutex);
791 if (FALSE == is_ok) {
793 __pims_ipc_free_handle(handle);
801 API pims_ipc_h pims_ipc_create(char *service)
803 return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
806 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
808 return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
811 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
813 pims_ipc_s *handle = ipc;
815 if (mode == PIMS_IPC_MODE_REQ) {
816 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY,
818 WARN("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
823 __pims_ipc_free_handle(handle);
826 API void pims_ipc_destroy(pims_ipc_h ipc)
828 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
831 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
833 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
836 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
840 unsigned int total_len;
841 unsigned int seq_no = 0;
842 gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
843 unsigned int call_id_len = strlen(call_id);
844 pims_ipc_data_s *data = NULL;
845 unsigned int has_data = FALSE;
846 unsigned int client_id_len = strlen(handle->id);
848 GET_CALL_SEQUNECE_NO(handle, seq_no);
850 int len = sizeof(total_len) + sizeof(client_id_len) + client_id_len + sizeof(seq_no)
851 + call_id_len + sizeof(call_id_len) + sizeof(has_data);
857 len += sizeof(unsigned int);
858 total_len = len + data->buf_size;
861 INFO("len(%d),client_id(%s),call_id(%s),seq_no(%d)", len, handle->id, call_id, seq_no);
864 memset(buf, 0x0, len+1);
866 memcpy(buf, &total_len, sizeof(total_len));
867 length += sizeof(total_len);
869 client_id_len = strlen(handle->id);
870 memcpy(buf+length, &client_id_len, sizeof(client_id_len));
871 length += sizeof(client_id_len);
872 memcpy(buf+length, handle->id, client_id_len);
873 length += client_id_len;
875 memcpy(buf+length, &seq_no, sizeof(seq_no));
876 length += sizeof(seq_no);
878 memcpy(buf+length, &call_id_len, sizeof(call_id_len));
879 length += sizeof(call_id_len);
880 memcpy(buf+length, call_id, call_id_len);
881 length += call_id_len;
884 memcpy(buf+length, &has_data, sizeof(has_data));
885 length += sizeof(has_data);
888 memcpy(buf+length, &(data->buf_size), sizeof(data->buf_size));
889 length += sizeof(data->buf_size);
891 ret = socket_send(handle->fd, buf, length);
893 ret = socket_send_data(handle->fd, data->buf, data->buf_size);
895 ret = socket_send(handle->fd, buf, length);
904 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
905 pims_ipc_data_h *data_out)
907 pims_ipc_s *handle = ipc;
909 RETV_IF(NULL == ipc, -1);
910 RETV_IF(NULL == module, -1);
911 RETV_IF(NULL == function, -1);
913 pthread_mutex_lock(&handle->call_status_mutex);
914 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
915 pthread_mutex_unlock(&handle->call_status_mutex);
916 ERR("the previous call is in progress : %p", ipc);
919 pthread_mutex_unlock(&handle->call_status_mutex);
922 pthread_mutex_lock(&handle->call_mutex);
924 ret = __pims_ipc_send(handle, module, function, data_in);
928 ret = __pims_ipc_receive(handle, data_out);
932 pthread_mutex_unlock(&handle->call_mutex);
937 static gboolean __call_async_idler_cb(gpointer data)
939 pims_ipc_s *handle = data;
940 pims_ipc_data_h dhandle;
942 RETV_IF(NULL == handle, FALSE);
944 dhandle = handle->dhandle_for_async_idler;
945 handle->dhandle_for_async_idler = NULL;
947 pthread_mutex_lock(&handle->call_status_mutex);
948 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
949 pthread_mutex_unlock(&handle->call_status_mutex);
951 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
952 pims_ipc_data_destroy(dhandle);
957 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition,
960 pims_ipc_s *handle = data;
961 pims_ipc_data_h dhandle = NULL;
963 if (__pims_ipc_receive(handle, &dhandle) == 0) {
964 VERBOSE("call status = %d", handle->call_status);
966 pthread_mutex_lock(&handle->call_status_mutex);
967 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
968 pthread_mutex_unlock(&handle->call_status_mutex);
969 pims_ipc_data_destroy(dhandle);
971 pthread_mutex_unlock(&handle->call_status_mutex);
972 if (src == NULL) { /* A response is arrived too quickly */
973 handle->dhandle_for_async_idler = dhandle;
974 g_idle_add(__call_async_idler_cb, handle);
976 pthread_mutex_lock(&handle->call_status_mutex);
977 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
978 pthread_mutex_unlock(&handle->call_status_mutex);
980 handle->call_async_callback((pims_ipc_h)handle, dhandle,
981 handle->call_async_userdata);
982 pims_ipc_data_destroy(dhandle);
989 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function,
990 pims_ipc_data_h data_in, pims_ipc_call_async_cb callback, void *user_data)
992 pims_ipc_s *handle = ipc;
995 RETV_IF(NULL == ipc, -1);
996 RETV_IF(NULL == module, -1);
997 RETV_IF(NULL == function, -1);
998 RETV_IF(NULL == callback, -1);
1000 pthread_mutex_lock(&handle->call_status_mutex);
1001 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
1002 pthread_mutex_unlock(&handle->call_status_mutex);
1003 ERR("the previous call is in progress : %p", ipc);
1006 pthread_mutex_unlock(&handle->call_status_mutex);
1008 pthread_mutex_lock(&handle->call_status_mutex);
1009 handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
1010 pthread_mutex_unlock(&handle->call_status_mutex);
1012 handle->call_async_callback = callback;
1013 handle->call_async_userdata = user_data;
1015 /* add a callback for GIOChannel */
1016 if (!handle->async_channel) {
1017 handle->async_channel = g_io_channel_unix_new(handle->fd);
1018 if (!handle->async_channel) {
1019 ERR("g_io_channel_unix_new error");
1024 source_id = g_io_add_watch(handle->async_channel, G_IO_IN,
1025 __pims_ipc_call_async_handler, handle);
1026 handle->async_source_id = source_id;
1028 if (__pims_ipc_send(handle, module, function, data_in) != 0) {
1029 g_source_remove(source_id);
1033 __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
1038 API int pims_ipc_is_call_in_progress(pims_ipc_h ipc)
1041 pims_ipc_s *handle = ipc;
1043 RETV_IF(NULL == ipc, FALSE);
1045 pthread_mutex_lock(&handle->call_status_mutex);
1046 if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
1050 pthread_mutex_unlock(&handle->call_status_mutex);
1054 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event,
1055 pims_ipc_subscribe_cb callback, void *user_data)
1057 gchar *call_id = NULL;
1058 pims_ipc_s *handle = ipc;
1059 pims_ipc_cb_s *cb_data = NULL;
1061 RETV_IF(NULL == ipc, -1);
1062 RETV_IF(NULL == module, -1);
1063 RETV_IF(NULL == event, -1);
1064 RETV_IF(NULL == callback, -1);
1065 RETV_IF(NULL == handle->subscribe_cb_table, -1);
1067 cb_data = g_new0(pims_ipc_cb_s, 1);
1068 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1070 VERBOSE("subscribe cb id[%s]", call_id);
1071 cb_data->callback = callback;
1072 cb_data->user_data = user_data;
1073 g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1078 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1080 gchar *call_id = NULL;
1081 pims_ipc_s *handle = ipc;
1083 RETV_IF(NULL == ipc, -1);
1084 RETV_IF(NULL == module, -1);
1085 RETV_IF(NULL == event, -1);
1086 RETV_IF(NULL == handle->subscribe_cb_table, -1);
1088 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1090 VERBOSE("unsubscribe cb id[%s]", call_id);
1092 if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1093 ERR("g_hash_table_remove error");
1102 API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle,
1103 pims_ipc_server_disconnected_cb callback, void *user_data)
1105 GList *cursor = NULL;
1107 /* check already existed */
1108 pthread_mutex_lock(&__disconnect_cb_mutex);
1109 cursor = g_list_first(disconnected_list);
1111 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1112 if (disconnected && disconnected->handle == handle) {
1113 ERR("Already set callback");
1114 pthread_mutex_unlock(&__disconnect_cb_mutex);
1117 cursor = g_list_next(cursor);
1119 pthread_mutex_unlock(&__disconnect_cb_mutex);
1121 /* append callback */
1122 pims_ipc_server_disconnected_cb_t *disconnected = NULL;
1123 disconnected = calloc(1, sizeof(pims_ipc_server_disconnected_cb_t));
1124 if (NULL == disconnected) {
1125 ERR("calloc() Fail");
1128 DBG("add disconnected");
1129 disconnected->handle = handle;
1130 disconnected->callback = callback;
1131 disconnected->user_data = user_data;
1133 pthread_mutex_lock(&__disconnect_cb_mutex);
1134 disconnected_list = g_list_append(disconnected_list, disconnected);
1135 pthread_mutex_unlock(&__disconnect_cb_mutex);
1140 API int pims_ipc_remove_server_disconnected_cb(pims_ipc_h handle)
1142 pthread_mutex_lock(&__disconnect_cb_mutex);
1144 GList *cursor = NULL;
1145 cursor = g_list_first(disconnected_list);
1147 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1148 if (disconnected && disconnected->handle == handle) {
1150 disconnected_list = g_list_delete_link(disconnected_list, cursor);
1151 DBG("remove disconnected_cb");
1154 cursor = g_list_next(cursor);
1156 pthread_mutex_unlock(&__disconnect_cb_mutex);
1161 /* start deprecated */
1162 API int pims_ipc_unset_server_disconnected_cb()
1164 pthread_mutex_lock(&__disconnect_cb_mutex);
1165 _server_disconnected_cb.callback = NULL;
1166 _server_disconnected_cb.user_data = NULL;
1167 pthread_mutex_unlock(&__disconnect_cb_mutex);
1171 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback,
1174 pthread_mutex_lock(&__disconnect_cb_mutex);
1175 _server_disconnected_cb.callback = callback;
1176 _server_disconnected_cb.user_data = user_data;
1177 pthread_mutex_unlock(&__disconnect_cb_mutex);
1180 /* end deprecated */