4 * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the License);
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
27 #include <poll.h> // pollfds
28 #include <sys/un.h> // sockaddr_un
29 #include <sys/ioctl.h> // ioctl
30 #include <sys/socket.h> //socket
31 #include <sys/types.h>
32 #include <sys/epoll.h> // epoll
33 #include <sys/eventfd.h> // eventfd
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-debug.h"
40 #include "pims-ipc-data.h"
41 #include "pims-ipc-data-internal.h"
44 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
45 sequence_no = ++((handle)->call_sequence_no);\
48 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
52 PIMS_IPC_CALL_STATUS_READY = 0,
53 PIMS_IPC_CALL_STATUS_IN_PROGRESS
54 } pims_ipc_call_status_e;
58 PIMS_IPC_MODE_REQ = 0,
64 pims_ipc_subscribe_cb callback;
71 pims_ipc_data_h *handle;
72 }pims_ipc_subscribe_data_s;
79 GIOChannel *async_channel;
80 guint disconnected_source;
81 guint async_source_id;
82 pthread_mutex_t call_status_mutex;
83 pims_ipc_call_status_e call_status;
84 unsigned int call_sequence_no;
85 pims_ipc_call_async_cb call_async_callback;
86 void *call_async_userdata;
87 pims_ipc_data_h dhandle_for_async_idler;
90 int epoll_stop_thread;
92 GHashTable *subscribe_cb_table;
94 pthread_mutex_t data_queue_mutex;
98 static unsigned int ref_cnt;
99 static GList *subscribe_handles;
100 static GList *disconnected_list;
103 pims_ipc_server_disconnected_cb callback;
106 } pims_ipc_server_disconnected_cb_t;
108 /* start deprecated */
109 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
111 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
113 static void __sub_data_free(gpointer user_data)
115 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
116 pims_ipc_data_destroy(data->handle);
121 static void __pims_ipc_free_handle(pims_ipc_s *handle)
123 pthread_mutex_lock(&__gmutex);
125 handle->epoll_stop_thread = true;
127 if (handle->fd != -1)
130 pthread_mutex_unlock(&__gmutex);
131 if (handle->io_thread)
132 pthread_join(handle->io_thread, NULL);
133 pthread_mutex_lock(&__gmutex);
136 g_free(handle->service);
138 if (handle->async_channel) {
139 // remove a subscriber handle from the golbal list
140 subscribe_handles = g_list_remove(subscribe_handles, handle);
141 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
143 g_source_remove(handle->async_source_id);
144 g_io_channel_unref(handle->async_channel);
147 if (handle->subscribe_cb_table)
148 g_hash_table_destroy(handle->subscribe_cb_table);
150 pthread_mutex_lock(&handle->data_queue_mutex);
151 if (handle->data_queue) {
152 g_list_free_full(handle->data_queue, __sub_data_free);
154 pthread_mutex_unlock(&handle->data_queue_mutex);
155 pthread_mutex_destroy(&handle->data_queue_mutex);
157 if (handle->subscribe_fd != -1)
158 close(handle->subscribe_fd);
160 g_source_remove(handle->disconnected_source);
161 pthread_mutex_destroy(&handle->call_status_mutex);
165 if (--ref_cnt <= 0) {
166 if (subscribe_handles)
167 g_list_free(subscribe_handles);
168 subscribe_handles = NULL;
171 pthread_mutex_unlock(&__gmutex);
174 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
176 pims_ipc_cb_s *cb_data = NULL;
180 read_command(handle->subscribe_fd, &dummy);
182 pthread_mutex_lock(&handle->data_queue_mutex);
183 if (!handle->data_queue) {
184 pthread_mutex_unlock(&handle->data_queue_mutex);
188 GList *cursor = g_list_first(handle->data_queue);
189 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
191 pthread_mutex_unlock(&handle->data_queue_mutex);
195 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
196 if (cb_data == NULL) {
197 VERBOSE("unable to find %s", call_id);
200 cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
202 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
203 __sub_data_free(data);
204 pthread_mutex_unlock(&handle->data_queue_mutex);
210 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
212 pims_ipc_s *handle = (pims_ipc_s *)data;
216 if (condition & G_IO_HUP)
219 pthread_mutex_lock(&__gmutex);
221 // check if a subscriber handle is exists
222 if (g_list_find(subscribe_handles, handle) == NULL) {
223 ERROR("No such handle that ID is %p", handle);
224 pthread_mutex_unlock(&__gmutex);
228 __pims_ipc_receive_for_subscribe(handle);
230 pthread_mutex_unlock(&__gmutex);
235 static unsigned int __get_global_sequence_no()
237 static unsigned int __gsequence_no = 0xffffffff;
239 if (__gsequence_no == 0xffffffff)
240 __gsequence_no = (unsigned int)time(NULL);
243 return __gsequence_no;
246 static int __pims_ipc_send_identify(pims_ipc_s *handle)
248 unsigned int sequence_no;
249 unsigned int client_id_len = strlen(handle->id);
250 unsigned int len = sizeof(unsigned int) // total size
251 + client_id_len + sizeof(unsigned int) // client_id
252 + sizeof(unsigned int) ; // seq_no
256 memset(buf, 0x0, len+1);
259 memcpy(buf, (void*)&len, sizeof(unsigned int));
260 length += sizeof(unsigned int);
263 memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
264 length += sizeof(unsigned int);
265 memcpy(buf+length, (void*)(handle->id), client_id_len);
266 length += client_id_len;
269 GET_CALL_SEQUNECE_NO(handle, sequence_no);
270 memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
271 length += sizeof(unsigned int);
273 return socket_send(handle->fd, buf, length);
276 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
279 gboolean is_ok = FALSE;
281 pims_ipc_data_h data = NULL;
282 unsigned int sequence_no = 0;
283 char *client_id = NULL;
284 char *call_id = NULL;
287 /* read the size of message. note that ioctl is non-blocking */
288 if (ioctl(handle->fd, FIONREAD, &len)) {
289 ERROR("ioctl failed: %d", errno);
293 /* when server or client closed socket */
295 ERROR("[IPC Socket] connection is closed");
300 unsigned int read_len = 0;
301 unsigned int total_len = 0;
302 unsigned int client_id_len = 0;
303 unsigned int call_id_len = 0;
304 unsigned int is_data = FALSE;
307 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
310 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(client_id_len), sizeof(unsigned int)));
311 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
312 client_id = calloc(1, client_id_len+1);
313 if (client_id == NULL) {
314 ERROR("calloc fail");
320 ret = socket_recv(handle->fd, (void *)&(client_id), client_id_len);
321 if (ret < 0) { ERROR("socket_recv error"); break; }
325 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(sequence_no), sizeof(unsigned int)));
326 if (total_len == read_len) {
328 data = pims_ipc_data_create(0);
330 ERROR("pims_ipc_data_create() Fail");
333 ret = pims_ipc_data_put(data, client_id, client_id_len);
335 WARNING("pims_ipc_data_put fail(%d)", ret);
339 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
340 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
341 call_id = calloc(1, call_id_len+1);
342 if (call_id == NULL) {
343 ERROR("calloc fail");
350 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
351 if (ret < 0) { ERROR("socket_recv error"); break; }
354 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
356 unsigned int data_len;
357 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
358 if (data_len > 0 && data_len < UINT_MAX-1) {
359 buf = calloc(1, data_len+1);
361 ERROR("calloc fail");
367 ret = socket_recv(handle->fd, (void *)&(buf), data_len);
368 if (ret < 0) { ERROR("socket_recv error"); break; }
371 data = pims_ipc_data_steal_unmarshal(buf, data_len);
373 ERROR("pims_ipc_data_steal_unmarshal() Fail");
379 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, sequence_no);
385 if (sequence_no == handle->call_sequence_no) {
386 if (data_out != NULL) {
390 pims_ipc_data_destroy(data);
395 pims_ipc_data_destroy(data);
396 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no);
405 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
408 struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof (struct pollfd));
409 if (NULL == pollfds) {
410 ERROR("calloc() Fail");
414 pollfds[0].fd = handle->fd;
415 pollfds[0].events = POLLIN | POLLERR | POLLHUP;
419 ret = poll(pollfds, 1, 1000);
420 if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) {
427 if (pollfds[0].revents & (POLLERR|POLLHUP)) {
428 ERROR("Server disconnected");
432 if (pollfds[0].revents & POLLIN) {
433 ret = __pims_ipc_read_data(handle, data_out);
442 static int __open_subscribe_fd(pims_ipc_s *handle)
444 // router inproc eventfd
445 int subscribe_fd = eventfd(0,0);
449 if (-1 == subscribe_fd) {
450 ERROR("eventfd error : %d", errno);
453 VERBOSE("subscribe :%d\n", subscribe_fd);
455 flags = fcntl (subscribe_fd, F_GETFL, 0);
458 ret = fcntl (subscribe_fd, F_SETFL, flags | O_NONBLOCK);
460 VERBOSE("subscribe fcntl : %d\n", ret);
462 handle->subscribe_fd = subscribe_fd;
466 static int __subscribe_data(pims_ipc_s * handle)
470 char *call_id = NULL;
472 pims_ipc_data_h dhandle = NULL;
475 /* read the size of message. note that ioctl is non-blocking */
476 if (ioctl(handle->fd, FIONREAD, &len)) {
477 ERROR("ioctl failed: %d", errno);
481 /* when server or client closed socket */
483 INFO("[IPC Socket] connection is closed");
488 unsigned int read_len = 0;
489 unsigned int total_len = 0;
490 unsigned int call_id_len = 0;
491 unsigned int is_data = FALSE;
494 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
497 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
498 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
499 call_id = calloc(1, call_id_len+1);
500 if (call_id == NULL) {
501 ERROR("calloc fail");
508 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
509 if (ret < 0) { ERROR("socket_recv error"); break; }
513 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
516 unsigned int data_len;
517 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
518 if (data_len > 0 && data_len < UINT_MAX-1) {
519 buf = calloc(1, data_len+1);
521 ERROR("calloc fail");
527 ret = socket_recv(handle->fd, (void *)&(buf), data_len);
529 ERROR("socket_recv error");
534 dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
535 if (NULL == dhandle) {
536 ERROR("pims_ipc_data_steal_unmarshal() Fail");
541 pims_ipc_subscribe_data_s *sub_data = (pims_ipc_subscribe_data_s *)calloc(1, sizeof(pims_ipc_subscribe_data_s));
542 if (NULL == sub_data) {
543 ERROR("calloc() Fail");
544 pims_ipc_data_destroy(dhandle);
548 sub_data->handle = dhandle;
549 sub_data->call_id = call_id;
552 pthread_mutex_lock(&handle->data_queue_mutex);
553 handle->data_queue = g_list_append(handle->data_queue, sub_data);
554 pthread_mutex_unlock(&handle->data_queue_mutex);
555 write_command(handle->subscribe_fd, 1);
565 static gboolean __hung_up_cb(gpointer data)
567 GList *cursor = NULL;
569 if (NULL == disconnected_list) {
570 DEBUG("No disconnected list");
574 pthread_mutex_lock(&__disconnect_cb_mutex);
575 cursor = g_list_first(disconnected_list);
577 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
578 if (disconnected && disconnected->handle == data && disconnected->callback) {
579 DEBUG("call hung_up callback");
580 disconnected->callback(disconnected->user_data);
583 cursor = g_list_next(cursor);
585 pthread_mutex_unlock(&__disconnect_cb_mutex);
590 static void* __io_thread(void *data)
592 pims_ipc_s *handle = data;
593 struct epoll_event ev = {0};
597 epfd = epoll_create(MAX_EPOLL_EVENT);
599 pthread_mutex_lock(&__gmutex);
601 ev.events = EPOLLIN | EPOLLHUP;
602 ev.data.fd = handle->fd;
604 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
605 WARN_IF(ret != 0, "listen error :%d", ret);
606 pthread_mutex_unlock(&__gmutex);
612 pthread_mutex_lock(&__gmutex);
614 if (handle->epoll_stop_thread) {
615 pthread_mutex_unlock(&__gmutex);
618 pthread_mutex_unlock(&__gmutex);
620 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
621 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, 50);
623 pthread_mutex_lock(&__gmutex);
625 if (handle->epoll_stop_thread) {
626 pthread_mutex_unlock(&__gmutex);
629 pthread_mutex_unlock(&__gmutex);
631 if (event_num == -1) {
632 if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
633 ERROR("errno:%d\n", errno);
638 pthread_mutex_lock(&__gmutex);
639 for (i = 0; i < event_num; i++) {
640 if (events[i].events & EPOLLHUP) {
641 ERROR("server fd closed");
642 handle->epoll_stop_thread = true;
646 if (events[i].events & EPOLLIN) {
647 if(__subscribe_data(handle) < 0) {
648 ERROR("server fd closed");
649 g_idle_add(__hung_up_cb, handle);
650 handle->epoll_stop_thread = true;
655 pthread_mutex_unlock(&__gmutex);
663 static gboolean _g_io_hup_cb(GIOChannel *src, GIOCondition condition, gpointer data)
665 if (G_IO_HUP & condition) {
669 } else if (G_IO_IN & condition) {
671 if (0 == recv(((pims_ipc_s *)data)->fd, buf, sizeof(buf), MSG_PEEK)) {
677 ERROR("Invalid condition (%d)", condition);
682 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
684 pims_ipc_s *handle = NULL;
685 gboolean is_ok = FALSE;
687 pthread_mutex_lock(&__gmutex);
690 struct sockaddr_un server_addr;
694 VERBOSE("Create %d th..", ref_cnt);
696 handle = g_new0(pims_ipc_s, 1);
697 if (handle == NULL) {
698 ERROR("Failed to allocation");
702 handle->subscribe_fd = -1;
703 handle->io_thread = 0;
704 handle->service = g_strdup(service);
705 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
706 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
707 if (handle->fd < 0) {
708 ERROR("socket error : %d, errno: %d", handle->fd, errno);
711 int flags = fcntl (handle->fd, F_GETFL, 0);
714 ret = fcntl (handle->fd, F_SETFL, flags | O_NONBLOCK);
715 VERBOSE("socket fcntl : %d\n", ret);
717 pthread_mutex_init(&handle->call_status_mutex, 0);
719 pthread_mutex_lock(&handle->call_status_mutex);
720 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
721 pthread_mutex_unlock(&handle->call_status_mutex);
723 bzero(&server_addr, sizeof(server_addr));
724 server_addr.sun_family = AF_UNIX;
725 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
727 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
729 ERROR("connect error : %d, errno: %d", ret, errno);
732 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
734 if (mode == PIMS_IPC_MODE_REQ) {
735 GIOChannel *ch = g_io_channel_unix_new(handle->fd);
736 handle->disconnected_source = g_io_add_watch(ch, G_IO_IN|G_IO_HUP, _g_io_hup_cb, handle);
738 handle->call_sequence_no = (unsigned int)time(NULL);
739 ret = __pims_ipc_send_identify(handle);
741 ERROR("__pims_ipc_send_identify error");
744 __pims_ipc_receive(handle, NULL);
746 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) {
747 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
751 handle->epoll_stop_thread = false;
752 pthread_mutex_init(&handle->data_queue_mutex, 0);
754 pthread_mutex_lock(&handle->data_queue_mutex);
755 handle->data_queue = NULL;
756 pthread_mutex_unlock(&handle->data_queue_mutex);
758 ret = __open_subscribe_fd(handle);
763 ret = pthread_create(&worker, NULL, __io_thread, handle);
766 handle->io_thread = worker;
768 GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
769 if (!async_channel) {
770 ERROR("g_io_channel_unix_new error");
773 handle->async_channel = async_channel;
774 handle->async_source_id = g_io_add_watch(handle->async_channel, G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
775 handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
776 ASSERT(handle->subscribe_cb_table);
778 // add a subscriber handle to the global list
779 subscribe_handles = g_list_append(subscribe_handles, handle);
780 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
784 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
787 pthread_mutex_unlock(&__gmutex);
789 if (FALSE == is_ok) {
791 __pims_ipc_free_handle(handle);
799 API pims_ipc_h pims_ipc_create(char *service)
801 return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
804 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
806 return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
809 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
811 pims_ipc_s *handle = (pims_ipc_s *)ipc;
813 if (mode == PIMS_IPC_MODE_REQ) {
814 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0) {
815 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
820 __pims_ipc_free_handle(handle);
823 API void pims_ipc_destroy(pims_ipc_h ipc)
825 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
828 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
830 __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
833 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
836 unsigned int sequence_no = 0;
837 gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
838 unsigned int call_id_len = strlen(call_id);
839 pims_ipc_data_s *data = NULL;
840 unsigned int is_data = FALSE;
841 unsigned int client_id_len = strlen(handle->id);
844 GET_CALL_SEQUNECE_NO(handle, sequence_no);
846 int len = sizeof(unsigned int) // total size
847 + client_id_len + sizeof(unsigned int) // client_id
848 + sizeof(unsigned int) // seq_no
849 + call_id_len + sizeof(unsigned int) // call_id
850 + sizeof(unsigned int); // is data
856 data = (pims_ipc_data_s*)data_in;
857 len += sizeof(unsigned int);
858 total_len = len + data->buf_size;
861 INFO("len : %d, client_id : %s, call_id : %s, seq_no :%d", len, handle->id, call_id, sequence_no);
865 memset(buf, 0x0, len+1);
867 memcpy(buf, (void*)&total_len, sizeof(unsigned int));
868 length += sizeof(unsigned int);
871 client_id_len = strlen(handle->id);
872 memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
873 length += sizeof(unsigned int);
874 memcpy(buf+length, (void*)(handle->id), client_id_len);
875 length += client_id_len;
878 memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
879 length += sizeof(unsigned int);
882 memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
883 length += sizeof(unsigned int);
884 memcpy(buf+length, (void*)(call_id), call_id_len);
885 length += call_id_len;
889 memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
890 length += sizeof(unsigned int);
893 memcpy(buf+length, (void*)&(data->buf_size), sizeof(unsigned int));
894 length += sizeof(unsigned int);
896 ret = socket_send(handle->fd, buf, length);
898 ret = socket_send_data(handle->fd, data->buf, data->buf_size);
901 ret = socket_send(handle->fd, buf, length);
910 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
911 pims_ipc_data_h *data_out)
913 pims_ipc_s *handle = (pims_ipc_s *)ipc;
917 ERROR("invalid handle : %p", ipc);
921 if (!module || !function) {
922 ERROR("invalid argument");
926 pthread_mutex_lock(&handle->call_status_mutex);
927 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
928 pthread_mutex_unlock(&handle->call_status_mutex);
929 ERROR("the previous call is in progress : %p", ipc);
932 pthread_mutex_unlock(&handle->call_status_mutex);
935 if (__pims_ipc_send(handle, module, function, data_in) != 0) {
939 if (__pims_ipc_receive(handle, data_out) != 0) {
946 static gboolean __call_async_idler_cb(gpointer data)
950 pims_ipc_s *handle = (pims_ipc_s *)data;
952 ASSERT(handle->dhandle_for_async_idler);
953 pims_ipc_data_h dhandle = handle->dhandle_for_async_idler;
954 handle->dhandle_for_async_idler = NULL;
956 pthread_mutex_lock(&handle->call_status_mutex);
957 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
958 pthread_mutex_unlock(&handle->call_status_mutex);
960 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
961 pims_ipc_data_destroy(dhandle);
966 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data)
968 pims_ipc_s *handle = (pims_ipc_s *)data;
969 pims_ipc_data_h dhandle = NULL;
971 if (__pims_ipc_receive(handle, &dhandle) == 0) {
972 VERBOSE("call status = %d", handle->call_status);
974 pthread_mutex_lock(&handle->call_status_mutex);
975 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
976 pthread_mutex_unlock(&handle->call_status_mutex);
977 pims_ipc_data_destroy(dhandle);
980 pthread_mutex_unlock(&handle->call_status_mutex);
981 if (src == NULL) { // A response is arrived too quickly
982 handle->dhandle_for_async_idler = dhandle;
983 g_idle_add(__call_async_idler_cb, handle);
986 pthread_mutex_lock(&handle->call_status_mutex);
987 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
988 pthread_mutex_unlock(&handle->call_status_mutex);
990 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
991 pims_ipc_data_destroy(dhandle);
998 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
999 pims_ipc_call_async_cb callback, void *userdata)
1001 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1002 guint source_id = 0;
1005 ERROR("invalid handle : %p", ipc);
1009 if (!module || !function || !callback) {
1010 ERROR("invalid argument");
1014 pthread_mutex_lock(&handle->call_status_mutex);
1015 if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
1016 pthread_mutex_unlock(&handle->call_status_mutex);
1017 ERROR("the previous call is in progress : %p", ipc);
1020 pthread_mutex_unlock(&handle->call_status_mutex);
1022 pthread_mutex_lock(&handle->call_status_mutex);
1023 handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
1024 pthread_mutex_unlock(&handle->call_status_mutex);
1026 handle->call_async_callback = callback;
1027 handle->call_async_userdata = userdata;
1029 // add a callback for GIOChannel
1030 if (!handle->async_channel) {
1031 handle->async_channel = g_io_channel_unix_new(handle->fd);
1032 if (!handle->async_channel) {
1033 ERROR("g_io_channel_unix_new error");
1038 source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle);
1039 handle->async_source_id = source_id;
1041 if (__pims_ipc_send(handle, module, function, data_in) != 0) {
1042 g_source_remove(source_id);
1046 __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
1051 API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc)
1054 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1057 ERROR("invalid handle : %p", ipc);
1061 pthread_mutex_lock(&handle->call_status_mutex);
1062 if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
1066 pthread_mutex_unlock(&handle->call_status_mutex);
1070 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata)
1072 gchar *call_id = NULL;
1073 pims_ipc_cb_s *cb_data = NULL;
1074 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1076 if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1077 ERROR("invalid handle : %p", ipc);
1081 if (!module || !event || !callback) {
1082 ERROR("invalid argument");
1086 cb_data = g_new0(pims_ipc_cb_s, 1);
1087 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1089 VERBOSE("subscribe cb id[%s]", call_id);
1090 cb_data->callback = callback;
1091 cb_data->user_data = userdata;
1092 g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1097 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1099 gchar *call_id = NULL;
1100 pims_ipc_s *handle = (pims_ipc_s *)ipc;
1102 if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1103 ERROR("invalid handle : %p", ipc);
1107 if (!module || !event) {
1108 ERROR("invalid argument");
1112 call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1114 VERBOSE("unsubscribe cb id[%s]", call_id);
1116 if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1117 ERROR("g_hash_table_remove error");
1126 API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle, pims_ipc_server_disconnected_cb callback, void *user_data)
1128 GList *cursor = NULL;
1130 /* check already existed */
1131 pthread_mutex_lock(&__disconnect_cb_mutex);
1132 cursor = g_list_first(disconnected_list);
1134 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1135 if (disconnected && disconnected->handle == handle) {
1136 ERROR("Already set callback");
1137 pthread_mutex_unlock(&__disconnect_cb_mutex);
1140 cursor = g_list_next(cursor);
1142 pthread_mutex_unlock(&__disconnect_cb_mutex);
1144 /* append callback */
1145 pims_ipc_server_disconnected_cb_t *disconnected = NULL;
1146 disconnected = calloc(1, sizeof(pims_ipc_server_disconnected_cb_t));
1147 if (NULL == disconnected) {
1148 ERROR("Calloc() Fail");
1151 DEBUG("add disconnected");
1152 disconnected->handle = handle;
1153 disconnected->callback = callback;
1154 disconnected->user_data = user_data;
1156 pthread_mutex_lock(&__disconnect_cb_mutex);
1157 disconnected_list = g_list_append(disconnected_list, disconnected);
1158 pthread_mutex_unlock(&__disconnect_cb_mutex);
1163 API int pims_ipc_remove_server_disconnected_cb(pims_ipc_h handle)
1165 pthread_mutex_lock(&__disconnect_cb_mutex);
1167 GList *cursor = NULL;
1168 cursor = g_list_first(disconnected_list);
1170 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1171 if (disconnected && disconnected->handle == handle) {
1173 disconnected_list = g_list_delete_link(disconnected_list, cursor);
1174 DEBUG("remove disconnected_cb");
1177 cursor = g_list_next(cursor);
1179 pthread_mutex_unlock(&__disconnect_cb_mutex);
1184 /* start deprecated */
1185 API int pims_ipc_unset_server_disconnected_cb()
1187 pthread_mutex_lock(&__disconnect_cb_mutex);
1188 _server_disconnected_cb.callback = NULL;
1189 _server_disconnected_cb.user_data = NULL;
1190 pthread_mutex_unlock(&__disconnect_cb_mutex);
1194 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback, void *user_data)
1196 pthread_mutex_lock(&__disconnect_cb_mutex);
1197 _server_disconnected_cb.callback = callback;
1198 _server_disconnected_cb.user_data = user_data;
1199 pthread_mutex_unlock(&__disconnect_cb_mutex);
1202 /* end deprecated */