23bcdc78068014a581d20dadc089cfd3cf0aeea0
[platform/core/pim/pims-ipc.git] / src / pims-ipc.c
1 /*
2  * PIMS IPC
3  *
4  * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the License);
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an AS IS BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include <unistd.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <time.h>
24 #include <glib.h>
25 #include <stdint.h>
26 #include <pthread.h>
27 #include <poll.h> // pollfds
28 #include <sys/un.h> // sockaddr_un
29 #include <sys/ioctl.h> // ioctl
30 #include <sys/socket.h> //socket
31 #include <sys/types.h>
32 #include <sys/epoll.h> // epoll
33 #include <sys/eventfd.h> // eventfd
34 #include <fcntl.h>
35 #include <errno.h>
36
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-debug.h"
40 #include "pims-ipc-data.h"
41 #include "pims-ipc-data-internal.h"
42 #include "pims-ipc.h"
43
44 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
45         sequence_no = ++((handle)->call_sequence_no);\
46 } while (0)
47
48 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
49
50 typedef enum
51 {
52         PIMS_IPC_CALL_STATUS_READY = 0,
53         PIMS_IPC_CALL_STATUS_IN_PROGRESS
54 } pims_ipc_call_status_e;
55
56 typedef enum
57 {
58         PIMS_IPC_MODE_REQ = 0,
59         PIMS_IPC_MODE_SUB
60 } pims_ipc_mode_e;
61
62 typedef struct
63 {
64         pims_ipc_subscribe_cb callback;
65         void * user_data;
66 } pims_ipc_cb_s;
67
68 typedef struct
69 {
70         char *call_id;
71         pims_ipc_data_h *handle;
72 }pims_ipc_subscribe_data_s;
73
74 typedef struct
75 {
76         int fd;
77         char *service;
78         char *id;
79         GIOChannel *async_channel;
80         guint async_source_id;
81         pthread_mutex_t call_status_mutex;
82         pims_ipc_call_status_e call_status;
83         unsigned int call_sequence_no;
84         pims_ipc_call_async_cb call_async_callback;
85         void *call_async_userdata;
86         pims_ipc_data_h dhandle_for_async_idler;
87
88         int subscribe_fd;
89         int epoll_stop_thread;
90         pthread_t io_thread;
91         GHashTable *subscribe_cb_table;
92
93         pthread_mutex_t data_queue_mutex;
94         GList *data_queue;
95 } pims_ipc_s;
96
97 static unsigned int ref_cnt;
98 static GList *subscribe_handles;
99
100 typedef struct {
101         pims_ipc_server_disconnected_cb callback;
102         void *user_data;
103 } pims_ipc_server_disconnected_cb_t;
104
105 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
106 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
107
108 static void __sub_data_free(gpointer user_data)
109 {
110         pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
111         pims_ipc_data_destroy(data->handle);
112         free(data->call_id);
113         free(data);
114 }
115
116 static void __pims_ipc_free_handle(pims_ipc_s *handle)
117 {
118         pthread_mutex_lock(&__gmutex);
119
120         handle->epoll_stop_thread = true;
121
122         if (handle->fd != -1)
123                 close(handle->fd);
124
125         if (handle->io_thread)
126                 pthread_join(handle->io_thread, NULL);
127
128         g_free(handle->id);
129         g_free(handle->service);
130
131         if (handle->async_channel) {
132                 // remove a subscriber handle from the golbal list
133                 subscribe_handles = g_list_remove(subscribe_handles, handle);
134                 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
135
136                 g_source_remove(handle->async_source_id);
137                 g_io_channel_unref(handle->async_channel);
138         }
139
140         if (handle->subscribe_cb_table)
141                 g_hash_table_destroy(handle->subscribe_cb_table);
142
143         pthread_mutex_lock(&handle->data_queue_mutex);
144         if (handle->data_queue) {
145                 g_list_free_full(handle->data_queue, __sub_data_free);
146         }
147         pthread_mutex_unlock(&handle->data_queue_mutex);
148         pthread_mutex_destroy(&handle->data_queue_mutex);
149
150         if (handle->subscribe_fd != -1)
151                 close(handle->subscribe_fd);
152
153         pthread_mutex_destroy(&handle->call_status_mutex);
154
155         g_free(handle);
156
157         if (--ref_cnt <= 0) {
158                 if (subscribe_handles)
159                         g_list_free(subscribe_handles);
160                 subscribe_handles = NULL;
161         }
162
163         pthread_mutex_unlock(&__gmutex);
164 }
165
166 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
167 {
168         pims_ipc_cb_s *cb_data = NULL;
169         uint64_t dummy;
170
171         do {
172                 read_command(handle->subscribe_fd, &dummy);
173
174                 pthread_mutex_lock(&handle->data_queue_mutex);
175                 if (!handle->data_queue) {
176                         pthread_mutex_unlock(&handle->data_queue_mutex);
177                         break;
178                 }
179
180                 GList *cursor = g_list_first(handle->data_queue);
181                 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
182                 if (data == NULL) {
183                         pthread_mutex_unlock(&handle->data_queue_mutex);
184                         break;
185                 }
186
187                 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
188                 if (cb_data == NULL) {
189                         VERBOSE("unable to find %s", call_id);
190                 }
191                 else
192                         cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
193
194                 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
195                 __sub_data_free(data);
196                 pthread_mutex_unlock(&handle->data_queue_mutex);
197         } while(1);
198
199         return 0;
200 }
201
202 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
203 {
204         pims_ipc_s *handle = (pims_ipc_s *)data;
205
206         VERBOSE("");
207
208         if (condition & G_IO_HUP)
209                 return FALSE;
210
211         pthread_mutex_lock(&__gmutex);
212
213         // check if a subscriber handle is exists
214         if (g_list_find(subscribe_handles, handle) == NULL) {
215                 ERROR("No such handle that ID is %p", handle);
216                 pthread_mutex_unlock(&__gmutex);
217                 return FALSE;
218         }
219
220         __pims_ipc_receive_for_subscribe(handle);
221
222         pthread_mutex_unlock(&__gmutex);
223
224         return TRUE;
225 }
226
227 static unsigned int __get_global_sequence_no()
228 {
229         static unsigned int __gsequence_no = 0xffffffff;
230
231         if (__gsequence_no == 0xffffffff)
232                 __gsequence_no = (unsigned int)time(NULL);
233         else
234                 __gsequence_no++;
235         return __gsequence_no;
236 }
237
238 static int __pims_ipc_send_identify(pims_ipc_s *handle)
239 {
240         unsigned int sequence_no;
241         unsigned int client_id_len = strlen(handle->id);
242         unsigned int len = sizeof(unsigned int)         // total size
243                 + client_id_len + sizeof(unsigned int)          // client_id
244                 + sizeof(unsigned int)  ;       // seq_no
245
246         char buf[len+1];
247         int length = 0;
248         memset(buf, 0x0, len+1);
249
250         // total len
251         memcpy(buf, (void*)&len, sizeof(unsigned int));
252         length += sizeof(unsigned int);
253
254         // client_id
255         memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
256         length += sizeof(unsigned int);
257         memcpy(buf+length, (void*)(handle->id), client_id_len);
258         length += client_id_len;
259
260         // seq_no
261         GET_CALL_SEQUNECE_NO(handle, sequence_no);
262         memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
263         length += sizeof(unsigned int);
264
265         return socket_send(handle->fd, buf, length);
266 }
267
268 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
269 {
270         int ret;
271         gboolean is_ok = FALSE;
272         int len = 0;
273         pims_ipc_data_h data = NULL;
274         unsigned int sequence_no = 0;
275         char *client_id = NULL;
276         char *call_id = NULL;
277         char *buf = NULL;
278
279         /* read the size of message. note that ioctl is non-blocking */
280         if (ioctl(handle->fd, FIONREAD, &len)) {
281                 ERROR("ioctl failed: %d", errno);
282                 return -1;
283         }
284
285         /* when server or client closed socket */
286         if (len == 0) {
287                 ERROR("[IPC Socket] connection is closed");
288                 return -1;
289         }
290
291         do {
292                 unsigned int read_len = 0;
293                 unsigned int total_len = 0;
294                 unsigned int client_id_len = 0;
295                 unsigned int call_id_len = 0;
296                 unsigned int is_data = FALSE;
297
298                 // get total_len
299                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
300
301                 // client_id
302                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(client_id_len), sizeof(unsigned int)));
303                 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
304                         client_id = calloc(1, client_id_len+1);
305                         if (client_id == NULL) {
306                                 ERROR("calloc fail");
307                                 break;
308                         }
309                 }
310                 else
311                         break;
312                 ret = socket_recv(handle->fd, (void *)&(client_id), client_id_len);
313                 if (ret < 0) {  ERROR("socket_recv error"); break;      }
314                 read_len += ret;
315
316                 // sequence no
317                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(sequence_no), sizeof(unsigned int)));
318                 if (total_len == read_len) {
319                         // send identity
320                         data = pims_ipc_data_create(0);
321                         if (NULL == data) {
322                                 ERROR("pims_ipc_data_create() Fail");
323                                 break;
324                         }
325                         ret = pims_ipc_data_put(data, client_id, client_id_len);
326                         if (ret != 0)
327                                 WARNING("pims_ipc_data_put fail(%d)", ret);
328                         break;
329                 }
330
331                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
332                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
333                         call_id = calloc(1, call_id_len+1);
334                         if (call_id == NULL) {
335                                 ERROR("calloc fail");
336                                 break;
337                         }
338                 }
339                 else
340                         break;
341
342                 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
343                 if (ret < 0) {  ERROR("socket_recv error"); break;      }
344                 read_len += ret;
345
346                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
347                 if (is_data) {
348                         unsigned int data_len;
349                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
350                         if (data_len > 0 && data_len < UINT_MAX-1) {
351                                 buf = calloc(1, data_len+1);
352                                 if (buf == NULL) {
353                                         ERROR("calloc fail");
354                                         break;
355                                 }
356                         }
357                         else
358                                 break;
359                         ret = socket_recv(handle->fd, (void *)&(buf), data_len);
360                         if (ret < 0) {  ERROR("socket_recv error"); break;      }
361                         read_len += ret;
362
363                         data = pims_ipc_data_steal_unmarshal(buf, data_len);
364                         if (NULL == data) {
365                                 ERROR("pims_ipc_data_steal_unmarshal() Fail");
366                                 break;
367                         }
368                         buf = NULL;
369                 }
370
371                 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, sequence_no);
372         } while(0);
373         free(client_id);
374         free(call_id);
375         free(buf);
376
377         if (sequence_no == handle->call_sequence_no) {
378                 if (data_out != NULL) {
379                         *data_out = data;
380                 }
381                 else if (data)
382                         pims_ipc_data_destroy(data);
383                 is_ok = TRUE;
384         }
385         else {
386                 if (data)
387                         pims_ipc_data_destroy(data);
388                 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no);
389         }
390
391         if (is_ok)
392                 return 0;
393
394         return -1;
395 }
396
397 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
398 {
399         int ret = -1;
400         struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof (struct pollfd));
401         if (NULL == pollfds) {
402                 ERROR("calloc() Fail");
403                 return -1;
404         }
405
406         pollfds[0].fd = handle->fd;
407         pollfds[0].events = POLLIN | POLLERR | POLLHUP;
408
409         while(1) {
410                 while(1) {
411                         ret = poll(pollfds, 1, 1000);
412                         if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) {
413                                 continue;
414                         }
415                         break;
416                 }
417
418                 if (ret > 0) {
419                         if (pollfds[0].revents & (POLLERR|POLLHUP)) {
420                                 ERROR("Server disconnected");
421                                 ret = -1;
422                                 break;
423                         }
424                         if (pollfds[0].revents & POLLIN) {
425                                 ret = __pims_ipc_read_data(handle, data_out);
426                                 break;
427                         }
428                 }
429         }
430         free (pollfds);
431         return ret;
432 }
433
434 static int __open_subscribe_fd(pims_ipc_s *handle)
435 {
436         // router inproc eventfd
437         int subscribe_fd = eventfd(0,0);
438         int flags;
439         int ret;
440
441         if (-1 == subscribe_fd) {
442                 ERROR("eventfd error : %d", errno);
443                 return -1;
444         }
445         VERBOSE("subscribe :%d\n", subscribe_fd);
446
447         flags = fcntl (subscribe_fd, F_GETFL, 0);
448         if (flags == -1)
449                 flags = 0;
450         ret = fcntl (subscribe_fd, F_SETFL, flags | O_NONBLOCK);
451         VERBOSE("subscribe fcntl : %d\n", ret);
452
453         handle->subscribe_fd = subscribe_fd;
454         return 0;
455 }
456
457 static int __subscribe_data(pims_ipc_s * handle)
458 {
459         int len;
460         int ret = -1;
461         char *call_id = NULL;
462         char *buf = NULL;
463         pims_ipc_data_h dhandle = NULL;
464
465         do {
466                 /* read the size of message. note that ioctl is non-blocking */
467                 if (ioctl(handle->fd, FIONREAD, &len)) {
468                         ERROR("ioctl failed: %d", errno);
469                         break;
470                 }
471
472                 /* when server or client closed socket */
473                 if (len == 0) {
474                         INFO("[IPC Socket] connection is closed");
475                         ret = -1;
476                         break;
477                 }
478
479                 unsigned int read_len = 0;
480                 unsigned int total_len = 0;
481                 unsigned int call_id_len = 0;
482                 unsigned int is_data = FALSE;
483
484                 // get total_len
485                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
486
487                 // call_id
488                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
489                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
490                         call_id = calloc(1, call_id_len+1);
491                         if (call_id == NULL) {
492                                 ERROR("calloc fail");
493                                 break;
494                         }
495                 }
496                 else
497                         break;
498
499                 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
500                 if (ret < 0) {  ERROR("socket_recv error"); break; }
501                 read_len += ret;
502
503                 // is_data
504                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
505
506                 if (is_data) {
507                         unsigned int data_len;
508                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
509                         if (data_len > 0 && data_len < UINT_MAX-1) {
510                                 buf = calloc(1, data_len+1);
511                                 if (buf == NULL) {
512                                         ERROR("calloc fail");
513                                         break;
514                                 }
515                         }
516                         else
517                                 break;
518                         ret = socket_recv(handle->fd, (void *)&(buf), data_len);
519                         if (ret < 0) {
520                                 ERROR("socket_recv error");
521                                 break;
522                         }
523                         read_len += ret;
524
525                         dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
526                         if (NULL == dhandle) {
527                                 ERROR("pims_ipc_data_steal_unmarshal() Fail");
528                                 break;
529                         }
530                         buf = NULL;
531
532                         pims_ipc_subscribe_data_s *sub_data = (pims_ipc_subscribe_data_s *)calloc(1, sizeof(pims_ipc_subscribe_data_s));
533                         if (NULL == sub_data) {
534                                 ERROR("calloc() Fail");
535                                 pims_ipc_data_destroy(dhandle);
536                                 ret = -1;
537                                 break;
538                         }
539                         sub_data->handle = dhandle;
540                         sub_data->call_id = call_id;
541                         call_id = NULL;
542
543                         pthread_mutex_lock(&handle->data_queue_mutex);
544                         handle->data_queue = g_list_append(handle->data_queue, sub_data);
545                         pthread_mutex_unlock(&handle->data_queue_mutex);
546                         write_command(handle->subscribe_fd, 1);
547                 }
548                 ret = 0;
549         } while(0);
550
551         free(call_id);
552         free(buf);
553         return ret;
554 }
555
556 static gboolean __hung_up_cb(gpointer data)
557 {
558         pthread_mutex_lock(&__disconnect_cb_mutex);
559         if (_server_disconnected_cb.callback)
560                 _server_disconnected_cb.callback(_server_disconnected_cb.user_data);
561         pthread_mutex_unlock(&__disconnect_cb_mutex);
562         return FALSE;
563 }
564
565 static void* __io_thread(void *data)
566 {
567         pims_ipc_s *handle = data;
568         struct epoll_event ev = {0};
569         int ret;
570         int epfd;
571
572         epfd = epoll_create(MAX_EPOLL_EVENT);
573
574         ev.events = EPOLLIN | EPOLLHUP;
575         ev.data.fd = handle->fd;
576
577         ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
578         WARN_IF(ret != 0, "listen error :%d", ret);
579
580         while (!handle->epoll_stop_thread) {
581                 int i = 0;
582                 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
583                 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, 50);
584
585                 if (handle->epoll_stop_thread)
586                         break;
587
588                 if (event_num == -1) {
589                         if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
590                                 ERROR("errno:%d\n", errno);
591                                 break;
592                         }
593                 }
594
595                 for (i = 0; i < event_num; i++) {
596                         if (events[i].events & EPOLLHUP) {
597                                 ERROR("server fd closed");
598                                 g_idle_add(__hung_up_cb, NULL);
599                                 handle->epoll_stop_thread = true;
600                                 break;
601                         }
602
603                         if (events[i].events & EPOLLIN) {
604                                 if(__subscribe_data(handle) < 0) {
605                                         ERROR("server fd closed");
606                                         g_idle_add(__hung_up_cb, NULL);
607                                         break;
608                                 }
609                         }
610                 }
611         }
612
613         close(epfd);
614
615         pthread_exit(NULL);
616 }
617
618 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
619 {
620         pims_ipc_s *handle = NULL;
621         gboolean is_ok = FALSE;
622
623         pthread_mutex_lock(&__gmutex);
624
625         do {
626                 struct sockaddr_un server_addr;
627                 int ret;
628
629                 ref_cnt++;
630                 VERBOSE("Create %d th..", ref_cnt);
631
632                 handle = g_new0(pims_ipc_s, 1);
633                 if (handle == NULL) {
634                         ERROR("Failed to allocation");
635                         break;
636                 }
637
638                 handle->subscribe_fd = -1;
639                 handle->io_thread = 0;
640                 handle->service = g_strdup(service);
641                 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
642                 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
643                 if (handle->fd < 0) {
644                         ERROR("socket error : %d, errno: %d", handle->fd, errno);
645                         break;
646                 }
647                 int flags = fcntl (handle->fd, F_GETFL, 0);
648                 if (flags == -1)
649                         flags = 0;
650                 ret = fcntl (handle->fd, F_SETFL, flags | O_NONBLOCK);
651                 VERBOSE("socket fcntl : %d\n", ret);
652
653                 pthread_mutex_init(&handle->call_status_mutex, 0);
654
655                 pthread_mutex_lock(&handle->call_status_mutex);
656                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
657                 pthread_mutex_unlock(&handle->call_status_mutex);
658
659                 bzero(&server_addr, sizeof(server_addr));
660                 server_addr.sun_family = AF_UNIX;
661                 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
662
663                 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
664                 if (ret != 0) {
665                         ERROR("connect error : %d, errno: %d", ret, errno);
666                         break;
667                 }
668                 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
669
670                 if (mode == PIMS_IPC_MODE_REQ) {
671                         handle->call_sequence_no = (unsigned int)time(NULL);
672                         ret = __pims_ipc_send_identify(handle);
673                         if (ret < 0) {
674                                 ERROR("__pims_ipc_send_identify error");
675                                 break;
676                         }
677                         __pims_ipc_receive(handle, NULL);
678
679                         if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) {
680                                 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
681                         }
682                 }
683                 else {
684                         handle->epoll_stop_thread = false;
685                         pthread_mutex_init(&handle->data_queue_mutex, 0);
686
687                         pthread_mutex_lock(&handle->data_queue_mutex);
688                         handle->data_queue = NULL;
689                         pthread_mutex_unlock(&handle->data_queue_mutex);
690
691                         ret = __open_subscribe_fd(handle);
692                         if (ret < 0)
693                                 break;
694
695                         pthread_t worker;
696                         ret = pthread_create(&worker, NULL, __io_thread, handle);
697                         if (ret != 0)
698                                 break;
699                         handle->io_thread  = worker;
700
701                         GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
702                         if (!async_channel) {
703                                 ERROR("g_io_channel_unix_new error");
704                                 break;
705                         }
706                         handle->async_channel = async_channel;
707                         handle->async_source_id = g_io_add_watch(handle->async_channel, G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
708                         handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
709                         ASSERT(handle->subscribe_cb_table);
710
711                         // add a subscriber handle to the global list
712                         subscribe_handles = g_list_append(subscribe_handles, handle);
713                         VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
714                 }
715
716                 is_ok = TRUE;
717                 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
718         } while(0);
719
720         pthread_mutex_unlock(&__gmutex);
721
722         if (FALSE == is_ok) {
723                 if (handle) {
724                         __pims_ipc_free_handle(handle);
725                         handle = NULL;
726                 }
727         }
728
729         return handle;
730 }
731
732 API pims_ipc_h pims_ipc_create(char *service)
733 {
734         return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
735 }
736
737 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
738 {
739         return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
740 }
741
742 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
743 {
744         pims_ipc_s *handle = (pims_ipc_s *)ipc;
745
746         if (mode == PIMS_IPC_MODE_REQ) {
747                 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0) {
748                         WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
749                 }
750         }
751
752         if (handle)
753                 __pims_ipc_free_handle(handle);
754 }
755
756 API void pims_ipc_destroy(pims_ipc_h ipc)
757 {
758         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
759 }
760
761 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
762 {
763         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
764 }
765
766 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
767 {
768         int ret = -1;
769         unsigned int sequence_no = 0;
770         gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
771         unsigned int call_id_len = strlen(call_id);
772         pims_ipc_data_s *data = NULL;
773         unsigned int is_data = FALSE;
774         unsigned int client_id_len = strlen(handle->id);
775         int length = 0;
776
777         GET_CALL_SEQUNECE_NO(handle, sequence_no);
778
779         int len = sizeof(unsigned int)                                          // total size
780                 + client_id_len + sizeof(unsigned int)  // client_id
781                 + sizeof(unsigned int)                                          // seq_no
782                 + call_id_len + sizeof(unsigned int)    // call_id
783                 + sizeof(unsigned int);                                         // is data
784
785         int total_len = len;
786
787         if (data_in) {
788                 is_data = TRUE;
789                 data = (pims_ipc_data_s*)data_in;
790                 len += sizeof(unsigned int);
791                 total_len = len + data->buf_size;
792         }
793
794         INFO("len : %d, client_id : %s, call_id : %s, seq_no :%d", len, handle->id, call_id, sequence_no);
795
796         char buf[len+1];
797
798         memset(buf, 0x0, len+1);
799
800         memcpy(buf, (void*)&total_len, sizeof(unsigned int));
801         length += sizeof(unsigned int);
802
803         // client_id
804         client_id_len = strlen(handle->id);
805         memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
806         length += sizeof(unsigned int);
807         memcpy(buf+length, (void*)(handle->id), client_id_len);
808         length += client_id_len;
809
810         // seq_no
811         memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
812         length += sizeof(unsigned int);
813
814         // call id
815         memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
816         length += sizeof(unsigned int);
817         memcpy(buf+length, (void*)(call_id), call_id_len);
818         length += call_id_len;
819         g_free(call_id);
820
821         // is_data
822         memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
823         length += sizeof(unsigned int);
824
825         if (is_data) {
826                 memcpy(buf+length, (void*)&(data->buf_size), sizeof(unsigned int));
827                 length += sizeof(unsigned int);
828
829                 ret = socket_send(handle->fd, buf, length);
830                 if (ret > 0)
831                         ret = socket_send_data(handle->fd, data->buf, data->buf_size);
832         }
833         else {
834                 ret = socket_send(handle->fd, buf, length);
835         }
836
837         if (ret < 0)
838                 return -1;
839
840         return 0;
841 }
842
843 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
844                 pims_ipc_data_h *data_out)
845 {
846         pims_ipc_s *handle = (pims_ipc_s *)ipc;
847
848
849         if (ipc == NULL) {
850                 ERROR("invalid handle : %p", ipc);
851                 return -1;
852         }
853
854         if (!module || !function) {
855                 ERROR("invalid argument");
856                 return -1;
857         }
858
859         pthread_mutex_lock(&handle->call_status_mutex);
860         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
861                 pthread_mutex_unlock(&handle->call_status_mutex);
862                 ERROR("the previous call is in progress : %p", ipc);
863                 return -1;
864         }
865         pthread_mutex_unlock(&handle->call_status_mutex);
866
867
868         if (__pims_ipc_send(handle, module, function, data_in) != 0) {
869                 return -1;
870         }
871
872         if (__pims_ipc_receive(handle, data_out) != 0) {
873                 return -1;
874         }
875
876         return 0;
877 }
878
879 static gboolean __call_async_idler_cb(gpointer data)
880 {
881         VERBOSE("");
882
883         pims_ipc_s *handle = (pims_ipc_s *)data;
884         ASSERT(handle);
885         ASSERT(handle->dhandle_for_async_idler);
886         pims_ipc_data_h dhandle = handle->dhandle_for_async_idler;
887         handle->dhandle_for_async_idler = NULL;
888
889         pthread_mutex_lock(&handle->call_status_mutex);
890         handle->call_status = PIMS_IPC_CALL_STATUS_READY;
891         pthread_mutex_unlock(&handle->call_status_mutex);
892
893         handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
894         pims_ipc_data_destroy(dhandle);
895
896         return FALSE;
897 }
898
899 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data)
900 {
901         pims_ipc_s *handle = (pims_ipc_s *)data;
902         pims_ipc_data_h dhandle = NULL;
903
904         if (__pims_ipc_receive(handle, &dhandle) == 0) {
905                 VERBOSE("call status = %d", handle->call_status);
906
907                 pthread_mutex_lock(&handle->call_status_mutex);
908                 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
909                         pthread_mutex_unlock(&handle->call_status_mutex);
910                         pims_ipc_data_destroy(dhandle);
911                 }
912                 else {
913                         pthread_mutex_unlock(&handle->call_status_mutex);
914                         if (src == NULL) {    // A response is arrived too quickly
915                                 handle->dhandle_for_async_idler = dhandle;
916                                 g_idle_add(__call_async_idler_cb, handle);
917                         }
918                         else {
919                                 pthread_mutex_lock(&handle->call_status_mutex);
920                                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
921                                 pthread_mutex_unlock(&handle->call_status_mutex);
922
923                                 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
924                                 pims_ipc_data_destroy(dhandle);
925                         }
926                 }
927         }
928         return FALSE;
929 }
930
931 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
932                 pims_ipc_call_async_cb callback, void *userdata)
933 {
934         pims_ipc_s *handle = (pims_ipc_s *)ipc;
935         guint source_id = 0;
936
937         if (ipc == NULL) {
938                 ERROR("invalid handle : %p", ipc);
939                 return -1;
940         }
941
942         if (!module || !function || !callback) {
943                 ERROR("invalid argument");
944                 return -1;
945         }
946
947         pthread_mutex_lock(&handle->call_status_mutex);
948         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
949                 pthread_mutex_unlock(&handle->call_status_mutex);
950                 ERROR("the previous call is in progress : %p", ipc);
951                 return -1;
952         }
953         pthread_mutex_unlock(&handle->call_status_mutex);
954
955         pthread_mutex_lock(&handle->call_status_mutex);
956         handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
957         pthread_mutex_unlock(&handle->call_status_mutex);
958
959         handle->call_async_callback = callback;
960         handle->call_async_userdata = userdata;
961
962         // add a callback for GIOChannel
963         if (!handle->async_channel) {
964                 handle->async_channel = g_io_channel_unix_new(handle->fd);
965                 if (!handle->async_channel) {
966                         ERROR("g_io_channel_unix_new error");
967                         return -1;
968                 }
969         }
970
971         source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle);
972         handle->async_source_id = source_id;
973
974         if (__pims_ipc_send(handle, module, function, data_in) != 0) {
975                 g_source_remove(source_id);
976                 return -1;
977         }
978
979         __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
980
981         return 0;
982 }
983
984 API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc)
985 {
986         int ret;
987         pims_ipc_s *handle = (pims_ipc_s *)ipc;
988
989         if (ipc == NULL) {
990                 ERROR("invalid handle : %p", ipc);
991                 return false;
992         }
993
994         pthread_mutex_lock(&handle->call_status_mutex);
995         if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
996                 ret = true;
997         else
998                 ret = false;
999         pthread_mutex_unlock(&handle->call_status_mutex);
1000         return ret;
1001 }
1002
1003 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata)
1004 {
1005         gchar *call_id = NULL;
1006         pims_ipc_cb_s *cb_data = NULL;
1007         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1008
1009         if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1010                 ERROR("invalid handle : %p", ipc);
1011                 return -1;
1012         }
1013
1014         if (!module || !event || !callback) {
1015                 ERROR("invalid argument");
1016                 return -1;
1017         }
1018
1019         cb_data = g_new0(pims_ipc_cb_s, 1);
1020         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1021
1022         VERBOSE("subscribe cb id[%s]", call_id);
1023         cb_data->callback = callback;
1024         cb_data->user_data = userdata;
1025         g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1026
1027         return 0;
1028 }
1029
1030 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1031 {
1032         gchar *call_id = NULL;
1033         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1034
1035         if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1036                 ERROR("invalid handle : %p", ipc);
1037                 return -1;
1038         }
1039
1040         if (!module || !event) {
1041                 ERROR("invalid argument");
1042                 return -1;
1043         }
1044
1045         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1046
1047         VERBOSE("unsubscribe cb id[%s]", call_id);
1048
1049         if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1050                 ERROR("g_hash_table_remove error");
1051                 g_free(call_id);
1052                 return -1;
1053         }
1054
1055         g_free(call_id);
1056         return 0;
1057 }
1058
1059 API int pims_ipc_unset_server_disconnected_cb()
1060 {
1061         pthread_mutex_lock(&__disconnect_cb_mutex);
1062         _server_disconnected_cb.callback = NULL;
1063         _server_disconnected_cb.user_data = NULL;
1064         pthread_mutex_unlock(&__disconnect_cb_mutex);
1065         return 0;
1066 }
1067
1068 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback, void *user_data)
1069 {
1070         pthread_mutex_lock(&__disconnect_cb_mutex);
1071         _server_disconnected_cb.callback = callback;
1072         _server_disconnected_cb.user_data = user_data;
1073         pthread_mutex_unlock(&__disconnect_cb_mutex);
1074         return 0;
1075 }