prevent of SE review fixed(CID 298857)
[platform/core/pim/pims-ipc.git] / src / pims-ipc.c
1 /*
2  * PIMS IPC
3  *
4  * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the License);
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an AS IS BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include <unistd.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <time.h>
24 #include <glib.h>
25 #include <stdint.h>
26 #include <pthread.h>
27 #include <poll.h> // pollfds
28 #include <sys/un.h> // sockaddr_un
29 #include <sys/ioctl.h> // ioctl
30 #include <sys/socket.h> //socket
31 #include <sys/types.h>
32 #include <sys/epoll.h> // epoll
33 #include <sys/eventfd.h> // eventfd
34 #include <fcntl.h>
35 #include <errno.h>
36
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-debug.h"
40 #include "pims-ipc-data.h"
41 #include "pims-ipc-data-internal.h"
42 #include "pims-ipc.h"
43
44 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
45         sequence_no = ++((handle)->call_sequence_no);\
46 } while (0)
47
48 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
49
50 typedef enum
51 {
52         PIMS_IPC_CALL_STATUS_READY = 0,
53         PIMS_IPC_CALL_STATUS_IN_PROGRESS
54 } pims_ipc_call_status_e;
55
56 typedef enum
57 {
58         PIMS_IPC_MODE_REQ = 0,
59         PIMS_IPC_MODE_SUB
60 } pims_ipc_mode_e;
61
62 typedef struct
63 {
64         pims_ipc_subscribe_cb callback;
65         void * user_data;
66 } pims_ipc_cb_s;
67
68 typedef struct
69 {
70         char *call_id;
71         pims_ipc_data_h *handle;
72 }pims_ipc_subscribe_data_s;
73
74 typedef struct
75 {
76         int fd;
77         char *service;
78         char *id;
79         GIOChannel *async_channel;
80         guint async_source_id;
81         pthread_mutex_t call_status_mutex;
82         pims_ipc_call_status_e call_status;
83         unsigned int call_sequence_no;
84         pims_ipc_call_async_cb call_async_callback;
85         void *call_async_userdata;
86         pims_ipc_data_h dhandle_for_async_idler;
87
88         int subscribe_fd;
89         int epoll_stop_thread;
90         pthread_t io_thread;
91         GHashTable *subscribe_cb_table;
92
93         pthread_mutex_t data_queue_mutex;
94         GList *data_queue;
95 } pims_ipc_s;
96
97 static unsigned int ref_cnt;
98 static GList *subscribe_handles;
99
100 typedef struct {
101         pims_ipc_server_disconnected_cb callback;
102         void *user_data;
103 } pims_ipc_server_disconnected_cb_t;
104
105 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
106 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
107
108 static void __sub_data_free(gpointer user_data)
109 {
110         pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
111         pims_ipc_data_destroy(data->handle);
112         free(data->call_id);
113         free(data);
114 }
115
116 static void __pims_ipc_free_handle(pims_ipc_s *handle)
117 {
118         pthread_mutex_lock(&__gmutex);
119
120         handle->epoll_stop_thread = true;
121
122         if (handle->fd != -1)
123                 close(handle->fd);
124
125         pthread_mutex_unlock(&__gmutex);
126         if (handle->io_thread)
127                 pthread_join(handle->io_thread, NULL);
128         pthread_mutex_lock(&__gmutex);
129
130         g_free(handle->id);
131         g_free(handle->service);
132
133         if (handle->async_channel) {
134                 // remove a subscriber handle from the golbal list
135                 subscribe_handles = g_list_remove(subscribe_handles, handle);
136                 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
137
138                 g_source_remove(handle->async_source_id);
139                 g_io_channel_unref(handle->async_channel);
140         }
141
142         if (handle->subscribe_cb_table)
143                 g_hash_table_destroy(handle->subscribe_cb_table);
144
145         pthread_mutex_lock(&handle->data_queue_mutex);
146         if (handle->data_queue) {
147                 g_list_free_full(handle->data_queue, __sub_data_free);
148         }
149         pthread_mutex_unlock(&handle->data_queue_mutex);
150         pthread_mutex_destroy(&handle->data_queue_mutex);
151
152         if (handle->subscribe_fd != -1)
153                 close(handle->subscribe_fd);
154
155         pthread_mutex_destroy(&handle->call_status_mutex);
156
157         g_free(handle);
158
159         if (--ref_cnt <= 0) {
160                 if (subscribe_handles)
161                         g_list_free(subscribe_handles);
162                 subscribe_handles = NULL;
163         }
164
165         pthread_mutex_unlock(&__gmutex);
166 }
167
168 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
169 {
170         pims_ipc_cb_s *cb_data = NULL;
171         uint64_t dummy;
172
173         do {
174                 read_command(handle->subscribe_fd, &dummy);
175
176                 pthread_mutex_lock(&handle->data_queue_mutex);
177                 if (!handle->data_queue) {
178                         pthread_mutex_unlock(&handle->data_queue_mutex);
179                         break;
180                 }
181
182                 GList *cursor = g_list_first(handle->data_queue);
183                 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
184                 if (data == NULL) {
185                         pthread_mutex_unlock(&handle->data_queue_mutex);
186                         break;
187                 }
188
189                 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
190                 if (cb_data == NULL) {
191                         VERBOSE("unable to find %s", call_id);
192                 }
193                 else
194                         cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
195
196                 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
197                 __sub_data_free(data);
198                 pthread_mutex_unlock(&handle->data_queue_mutex);
199         } while(1);
200
201         return 0;
202 }
203
204 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
205 {
206         pims_ipc_s *handle = (pims_ipc_s *)data;
207
208         VERBOSE("");
209
210         if (condition & G_IO_HUP)
211                 return FALSE;
212
213         pthread_mutex_lock(&__gmutex);
214
215         // check if a subscriber handle is exists
216         if (g_list_find(subscribe_handles, handle) == NULL) {
217                 ERROR("No such handle that ID is %p", handle);
218                 pthread_mutex_unlock(&__gmutex);
219                 return FALSE;
220         }
221
222         __pims_ipc_receive_for_subscribe(handle);
223
224         pthread_mutex_unlock(&__gmutex);
225
226         return TRUE;
227 }
228
229 static unsigned int __get_global_sequence_no()
230 {
231         static unsigned int __gsequence_no = 0xffffffff;
232
233         if (__gsequence_no == 0xffffffff)
234                 __gsequence_no = (unsigned int)time(NULL);
235         else
236                 __gsequence_no++;
237         return __gsequence_no;
238 }
239
240 static int __pims_ipc_send_identify(pims_ipc_s *handle)
241 {
242         unsigned int sequence_no;
243         unsigned int client_id_len = strlen(handle->id);
244         unsigned int len = sizeof(unsigned int)         // total size
245                 + client_id_len + sizeof(unsigned int)          // client_id
246                 + sizeof(unsigned int)  ;       // seq_no
247
248         char buf[len+1];
249         int length = 0;
250         memset(buf, 0x0, len+1);
251
252         // total len
253         memcpy(buf, (void*)&len, sizeof(unsigned int));
254         length += sizeof(unsigned int);
255
256         // client_id
257         memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
258         length += sizeof(unsigned int);
259         memcpy(buf+length, (void*)(handle->id), client_id_len);
260         length += client_id_len;
261
262         // seq_no
263         GET_CALL_SEQUNECE_NO(handle, sequence_no);
264         memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
265         length += sizeof(unsigned int);
266
267         return socket_send(handle->fd, buf, length);
268 }
269
270 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
271 {
272         int ret;
273         gboolean is_ok = FALSE;
274         int len = 0;
275         pims_ipc_data_h data = NULL;
276         unsigned int sequence_no = 0;
277         char *client_id = NULL;
278         char *call_id = NULL;
279         char *buf = NULL;
280
281         /* read the size of message. note that ioctl is non-blocking */
282         if (ioctl(handle->fd, FIONREAD, &len)) {
283                 ERROR("ioctl failed: %d", errno);
284                 return -1;
285         }
286
287         /* when server or client closed socket */
288         if (len == 0) {
289                 ERROR("[IPC Socket] connection is closed");
290                 return -1;
291         }
292
293         do {
294                 unsigned int read_len = 0;
295                 unsigned int total_len = 0;
296                 unsigned int client_id_len = 0;
297                 unsigned int call_id_len = 0;
298                 unsigned int is_data = FALSE;
299
300                 // get total_len
301                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
302
303                 // client_id
304                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(client_id_len), sizeof(unsigned int)));
305                 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
306                         client_id = calloc(1, client_id_len+1);
307                         if (client_id == NULL) {
308                                 ERROR("calloc fail");
309                                 break;
310                         }
311                 }
312                 else
313                         break;
314                 ret = socket_recv(handle->fd, (void *)&(client_id), client_id_len);
315                 if (ret < 0) {  ERROR("socket_recv error"); break;      }
316                 read_len += ret;
317
318                 // sequence no
319                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(sequence_no), sizeof(unsigned int)));
320                 if (total_len == read_len) {
321                         // send identity
322                         data = pims_ipc_data_create(0);
323                         if (NULL == data) {
324                                 ERROR("pims_ipc_data_create() Fail");
325                                 break;
326                         }
327                         ret = pims_ipc_data_put(data, client_id, client_id_len);
328                         if (ret != 0)
329                                 WARNING("pims_ipc_data_put fail(%d)", ret);
330                         break;
331                 }
332
333                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
334                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
335                         call_id = calloc(1, call_id_len+1);
336                         if (call_id == NULL) {
337                                 ERROR("calloc fail");
338                                 break;
339                         }
340                 }
341                 else
342                         break;
343
344                 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
345                 if (ret < 0) {  ERROR("socket_recv error"); break;      }
346                 read_len += ret;
347
348                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
349                 if (is_data) {
350                         unsigned int data_len;
351                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
352                         if (data_len > 0 && data_len < UINT_MAX-1) {
353                                 buf = calloc(1, data_len+1);
354                                 if (buf == NULL) {
355                                         ERROR("calloc fail");
356                                         break;
357                                 }
358                         }
359                         else
360                                 break;
361                         ret = socket_recv(handle->fd, (void *)&(buf), data_len);
362                         if (ret < 0) {  ERROR("socket_recv error"); break;      }
363                         read_len += ret;
364
365                         data = pims_ipc_data_steal_unmarshal(buf, data_len);
366                         if (NULL == data) {
367                                 ERROR("pims_ipc_data_steal_unmarshal() Fail");
368                                 break;
369                         }
370                         buf = NULL;
371                 }
372
373                 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, sequence_no);
374         } while(0);
375         free(client_id);
376         free(call_id);
377         free(buf);
378
379         if (sequence_no == handle->call_sequence_no) {
380                 if (data_out != NULL) {
381                         *data_out = data;
382                 }
383                 else if (data)
384                         pims_ipc_data_destroy(data);
385                 is_ok = TRUE;
386         }
387         else {
388                 if (data)
389                         pims_ipc_data_destroy(data);
390                 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no);
391         }
392
393         if (is_ok)
394                 return 0;
395
396         return -1;
397 }
398
399 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
400 {
401         int ret = -1;
402         struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof (struct pollfd));
403         if (NULL == pollfds) {
404                 ERROR("calloc() Fail");
405                 return -1;
406         }
407
408         pollfds[0].fd = handle->fd;
409         pollfds[0].events = POLLIN | POLLERR | POLLHUP;
410
411         while(1) {
412                 while(1) {
413                         ret = poll(pollfds, 1, 1000);
414                         if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) {
415                                 continue;
416                         }
417                         break;
418                 }
419
420                 if (ret > 0) {
421                         if (pollfds[0].revents & (POLLERR|POLLHUP)) {
422                                 ERROR("Server disconnected");
423                                 ret = -1;
424                                 break;
425                         }
426                         if (pollfds[0].revents & POLLIN) {
427                                 ret = __pims_ipc_read_data(handle, data_out);
428                                 break;
429                         }
430                 }
431         }
432         free (pollfds);
433         return ret;
434 }
435
436 static int __open_subscribe_fd(pims_ipc_s *handle)
437 {
438         // router inproc eventfd
439         int subscribe_fd = eventfd(0,0);
440         int flags;
441         int ret;
442
443         if (-1 == subscribe_fd) {
444                 ERROR("eventfd error : %d", errno);
445                 return -1;
446         }
447         VERBOSE("subscribe :%d\n", subscribe_fd);
448
449         flags = fcntl (subscribe_fd, F_GETFL, 0);
450         if (flags == -1)
451                 flags = 0;
452         ret = fcntl (subscribe_fd, F_SETFL, flags | O_NONBLOCK);
453         VERBOSE("subscribe fcntl : %d\n", ret);
454
455         handle->subscribe_fd = subscribe_fd;
456         return 0;
457 }
458
459 static int __subscribe_data(pims_ipc_s * handle)
460 {
461         int len;
462         int ret = -1;
463         char *call_id = NULL;
464         char *buf = NULL;
465         pims_ipc_data_h dhandle = NULL;
466
467         do {
468                 /* read the size of message. note that ioctl is non-blocking */
469                 if (ioctl(handle->fd, FIONREAD, &len)) {
470                         ERROR("ioctl failed: %d", errno);
471                         break;
472                 }
473
474                 /* when server or client closed socket */
475                 if (len == 0) {
476                         INFO("[IPC Socket] connection is closed");
477                         ret = -1;
478                         break;
479                 }
480
481                 unsigned int read_len = 0;
482                 unsigned int total_len = 0;
483                 unsigned int call_id_len = 0;
484                 unsigned int is_data = FALSE;
485
486                 // get total_len
487                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
488
489                 // call_id
490                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
491                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
492                         call_id = calloc(1, call_id_len+1);
493                         if (call_id == NULL) {
494                                 ERROR("calloc fail");
495                                 break;
496                         }
497                 }
498                 else
499                         break;
500
501                 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
502                 if (ret < 0) {  ERROR("socket_recv error"); break; }
503                 read_len += ret;
504
505                 // is_data
506                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
507
508                 if (is_data) {
509                         unsigned int data_len;
510                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
511                         if (data_len > 0 && data_len < UINT_MAX-1) {
512                                 buf = calloc(1, data_len+1);
513                                 if (buf == NULL) {
514                                         ERROR("calloc fail");
515                                         break;
516                                 }
517                         }
518                         else
519                                 break;
520                         ret = socket_recv(handle->fd, (void *)&(buf), data_len);
521                         if (ret < 0) {
522                                 ERROR("socket_recv error");
523                                 break;
524                         }
525                         read_len += ret;
526
527                         dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
528                         if (NULL == dhandle) {
529                                 ERROR("pims_ipc_data_steal_unmarshal() Fail");
530                                 break;
531                         }
532                         buf = NULL;
533
534                         pims_ipc_subscribe_data_s *sub_data = (pims_ipc_subscribe_data_s *)calloc(1, sizeof(pims_ipc_subscribe_data_s));
535                         if (NULL == sub_data) {
536                                 ERROR("calloc() Fail");
537                                 pims_ipc_data_destroy(dhandle);
538                                 ret = -1;
539                                 break;
540                         }
541                         sub_data->handle = dhandle;
542                         sub_data->call_id = call_id;
543                         call_id = NULL;
544
545                         pthread_mutex_lock(&handle->data_queue_mutex);
546                         handle->data_queue = g_list_append(handle->data_queue, sub_data);
547                         pthread_mutex_unlock(&handle->data_queue_mutex);
548                         write_command(handle->subscribe_fd, 1);
549                 }
550                 ret = 0;
551         } while(0);
552
553         free(call_id);
554         free(buf);
555         return ret;
556 }
557
558 static gboolean __hung_up_cb(gpointer data)
559 {
560         pthread_mutex_lock(&__disconnect_cb_mutex);
561         if (_server_disconnected_cb.callback)
562                 _server_disconnected_cb.callback(_server_disconnected_cb.user_data);
563         pthread_mutex_unlock(&__disconnect_cb_mutex);
564         return FALSE;
565 }
566
567 static void* __io_thread(void *data)
568 {
569         pims_ipc_s *handle = data;
570         struct epoll_event ev = {0};
571         int ret;
572         int epfd;
573
574         epfd = epoll_create(MAX_EPOLL_EVENT);
575
576         pthread_mutex_lock(&__gmutex);
577
578         ev.events = EPOLLIN | EPOLLHUP;
579         ev.data.fd = handle->fd;
580
581         ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
582         WARN_IF(ret != 0, "listen error :%d", ret);
583         pthread_mutex_unlock(&__gmutex);
584
585
586         while (1) {
587                 int i = 0;
588
589                 pthread_mutex_lock(&__gmutex);
590
591                 if (handle->epoll_stop_thread) {
592                         pthread_mutex_unlock(&__gmutex);
593                         break;
594                 }
595                 pthread_mutex_unlock(&__gmutex);
596
597                 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
598                 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, 50);
599
600                 pthread_mutex_lock(&__gmutex);
601
602                 if (handle->epoll_stop_thread) {
603                         pthread_mutex_unlock(&__gmutex);
604                         break;
605                 }
606                 pthread_mutex_unlock(&__gmutex);
607
608                 if (event_num == -1) {
609                         if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
610                                 ERROR("errno:%d\n", errno);
611                                 break;
612                         }
613                 }
614
615                 pthread_mutex_lock(&__gmutex);
616                 for (i = 0; i < event_num; i++) {
617                         if (events[i].events & EPOLLHUP) {
618                                 ERROR("server fd closed");
619                                 g_idle_add(__hung_up_cb, NULL);
620                                 handle->epoll_stop_thread = true;
621                                 break;
622                         }
623
624                         if (events[i].events & EPOLLIN) {
625                                 if(__subscribe_data(handle) < 0) {
626                                         ERROR("server fd closed");
627                                         g_idle_add(__hung_up_cb, NULL);
628                                         handle->epoll_stop_thread = true;
629                                         break;
630                                 }
631                         }
632                 }
633                 pthread_mutex_unlock(&__gmutex);
634         }
635
636         close(epfd);
637
638         pthread_exit(NULL);
639 }
640
641 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
642 {
643         pims_ipc_s *handle = NULL;
644         gboolean is_ok = FALSE;
645
646         pthread_mutex_lock(&__gmutex);
647
648         do {
649                 struct sockaddr_un server_addr;
650                 int ret;
651
652                 ref_cnt++;
653                 VERBOSE("Create %d th..", ref_cnt);
654
655                 handle = g_new0(pims_ipc_s, 1);
656                 if (handle == NULL) {
657                         ERROR("Failed to allocation");
658                         break;
659                 }
660
661                 handle->subscribe_fd = -1;
662                 handle->io_thread = 0;
663                 handle->service = g_strdup(service);
664                 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
665                 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
666                 if (handle->fd < 0) {
667                         ERROR("socket error : %d, errno: %d", handle->fd, errno);
668                         break;
669                 }
670                 int flags = fcntl (handle->fd, F_GETFL, 0);
671                 if (flags == -1)
672                         flags = 0;
673                 ret = fcntl (handle->fd, F_SETFL, flags | O_NONBLOCK);
674                 VERBOSE("socket fcntl : %d\n", ret);
675
676                 pthread_mutex_init(&handle->call_status_mutex, 0);
677
678                 pthread_mutex_lock(&handle->call_status_mutex);
679                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
680                 pthread_mutex_unlock(&handle->call_status_mutex);
681
682                 bzero(&server_addr, sizeof(server_addr));
683                 server_addr.sun_family = AF_UNIX;
684                 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
685
686                 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
687                 if (ret != 0) {
688                         ERROR("connect error : %d, errno: %d", ret, errno);
689                         break;
690                 }
691                 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
692
693                 if (mode == PIMS_IPC_MODE_REQ) {
694                         handle->call_sequence_no = (unsigned int)time(NULL);
695                         ret = __pims_ipc_send_identify(handle);
696                         if (ret < 0) {
697                                 ERROR("__pims_ipc_send_identify error");
698                                 break;
699                         }
700                         __pims_ipc_receive(handle, NULL);
701
702                         if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) {
703                                 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
704                         }
705                 }
706                 else {
707                         handle->epoll_stop_thread = false;
708                         pthread_mutex_init(&handle->data_queue_mutex, 0);
709
710                         pthread_mutex_lock(&handle->data_queue_mutex);
711                         handle->data_queue = NULL;
712                         pthread_mutex_unlock(&handle->data_queue_mutex);
713
714                         ret = __open_subscribe_fd(handle);
715                         if (ret < 0)
716                                 break;
717
718                         pthread_t worker;
719                         ret = pthread_create(&worker, NULL, __io_thread, handle);
720                         if (ret != 0)
721                                 break;
722                         handle->io_thread  = worker;
723
724                         GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
725                         if (!async_channel) {
726                                 ERROR("g_io_channel_unix_new error");
727                                 break;
728                         }
729                         handle->async_channel = async_channel;
730                         handle->async_source_id = g_io_add_watch(handle->async_channel, G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
731                         handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
732                         ASSERT(handle->subscribe_cb_table);
733
734                         // add a subscriber handle to the global list
735                         subscribe_handles = g_list_append(subscribe_handles, handle);
736                         VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
737                 }
738
739                 is_ok = TRUE;
740                 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
741         } while(0);
742
743         pthread_mutex_unlock(&__gmutex);
744
745         if (FALSE == is_ok) {
746                 if (handle) {
747                         __pims_ipc_free_handle(handle);
748                         handle = NULL;
749                 }
750         }
751
752         return handle;
753 }
754
755 API pims_ipc_h pims_ipc_create(char *service)
756 {
757         return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
758 }
759
760 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
761 {
762         return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
763 }
764
765 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
766 {
767         pims_ipc_s *handle = (pims_ipc_s *)ipc;
768
769         if (mode == PIMS_IPC_MODE_REQ) {
770                 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0) {
771                         WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
772                 }
773         }
774
775         if (handle)
776                 __pims_ipc_free_handle(handle);
777 }
778
779 API void pims_ipc_destroy(pims_ipc_h ipc)
780 {
781         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
782 }
783
784 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
785 {
786         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
787 }
788
789 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
790 {
791         int ret = -1;
792         unsigned int sequence_no = 0;
793         gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
794         unsigned int call_id_len = strlen(call_id);
795         pims_ipc_data_s *data = NULL;
796         unsigned int is_data = FALSE;
797         unsigned int client_id_len = strlen(handle->id);
798         int length = 0;
799
800         GET_CALL_SEQUNECE_NO(handle, sequence_no);
801
802         int len = sizeof(unsigned int)                                          // total size
803                 + client_id_len + sizeof(unsigned int)  // client_id
804                 + sizeof(unsigned int)                                          // seq_no
805                 + call_id_len + sizeof(unsigned int)    // call_id
806                 + sizeof(unsigned int);                                         // is data
807
808         int total_len = len;
809
810         if (data_in) {
811                 is_data = TRUE;
812                 data = (pims_ipc_data_s*)data_in;
813                 len += sizeof(unsigned int);
814                 total_len = len + data->buf_size;
815         }
816
817         INFO("len : %d, client_id : %s, call_id : %s, seq_no :%d", len, handle->id, call_id, sequence_no);
818
819         char buf[len+1];
820
821         memset(buf, 0x0, len+1);
822
823         memcpy(buf, (void*)&total_len, sizeof(unsigned int));
824         length += sizeof(unsigned int);
825
826         // client_id
827         client_id_len = strlen(handle->id);
828         memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
829         length += sizeof(unsigned int);
830         memcpy(buf+length, (void*)(handle->id), client_id_len);
831         length += client_id_len;
832
833         // seq_no
834         memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
835         length += sizeof(unsigned int);
836
837         // call id
838         memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
839         length += sizeof(unsigned int);
840         memcpy(buf+length, (void*)(call_id), call_id_len);
841         length += call_id_len;
842         g_free(call_id);
843
844         // is_data
845         memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
846         length += sizeof(unsigned int);
847
848         if (is_data) {
849                 memcpy(buf+length, (void*)&(data->buf_size), sizeof(unsigned int));
850                 length += sizeof(unsigned int);
851
852                 ret = socket_send(handle->fd, buf, length);
853                 if (ret > 0)
854                         ret = socket_send_data(handle->fd, data->buf, data->buf_size);
855         }
856         else {
857                 ret = socket_send(handle->fd, buf, length);
858         }
859
860         if (ret < 0)
861                 return -1;
862
863         return 0;
864 }
865
866 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
867                 pims_ipc_data_h *data_out)
868 {
869         pims_ipc_s *handle = (pims_ipc_s *)ipc;
870
871
872         if (ipc == NULL) {
873                 ERROR("invalid handle : %p", ipc);
874                 return -1;
875         }
876
877         if (!module || !function) {
878                 ERROR("invalid argument");
879                 return -1;
880         }
881
882         pthread_mutex_lock(&handle->call_status_mutex);
883         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
884                 pthread_mutex_unlock(&handle->call_status_mutex);
885                 ERROR("the previous call is in progress : %p", ipc);
886                 return -1;
887         }
888         pthread_mutex_unlock(&handle->call_status_mutex);
889
890
891         if (__pims_ipc_send(handle, module, function, data_in) != 0) {
892                 return -1;
893         }
894
895         if (__pims_ipc_receive(handle, data_out) != 0) {
896                 return -1;
897         }
898
899         return 0;
900 }
901
902 static gboolean __call_async_idler_cb(gpointer data)
903 {
904         VERBOSE("");
905
906         pims_ipc_s *handle = (pims_ipc_s *)data;
907         ASSERT(handle);
908         ASSERT(handle->dhandle_for_async_idler);
909         pims_ipc_data_h dhandle = handle->dhandle_for_async_idler;
910         handle->dhandle_for_async_idler = NULL;
911
912         pthread_mutex_lock(&handle->call_status_mutex);
913         handle->call_status = PIMS_IPC_CALL_STATUS_READY;
914         pthread_mutex_unlock(&handle->call_status_mutex);
915
916         handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
917         pims_ipc_data_destroy(dhandle);
918
919         return FALSE;
920 }
921
922 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data)
923 {
924         pims_ipc_s *handle = (pims_ipc_s *)data;
925         pims_ipc_data_h dhandle = NULL;
926
927         if (__pims_ipc_receive(handle, &dhandle) == 0) {
928                 VERBOSE("call status = %d", handle->call_status);
929
930                 pthread_mutex_lock(&handle->call_status_mutex);
931                 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
932                         pthread_mutex_unlock(&handle->call_status_mutex);
933                         pims_ipc_data_destroy(dhandle);
934                 }
935                 else {
936                         pthread_mutex_unlock(&handle->call_status_mutex);
937                         if (src == NULL) {    // A response is arrived too quickly
938                                 handle->dhandle_for_async_idler = dhandle;
939                                 g_idle_add(__call_async_idler_cb, handle);
940                         }
941                         else {
942                                 pthread_mutex_lock(&handle->call_status_mutex);
943                                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
944                                 pthread_mutex_unlock(&handle->call_status_mutex);
945
946                                 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
947                                 pims_ipc_data_destroy(dhandle);
948                         }
949                 }
950         }
951         return FALSE;
952 }
953
954 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
955                 pims_ipc_call_async_cb callback, void *userdata)
956 {
957         pims_ipc_s *handle = (pims_ipc_s *)ipc;
958         guint source_id = 0;
959
960         if (ipc == NULL) {
961                 ERROR("invalid handle : %p", ipc);
962                 return -1;
963         }
964
965         if (!module || !function || !callback) {
966                 ERROR("invalid argument");
967                 return -1;
968         }
969
970         pthread_mutex_lock(&handle->call_status_mutex);
971         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
972                 pthread_mutex_unlock(&handle->call_status_mutex);
973                 ERROR("the previous call is in progress : %p", ipc);
974                 return -1;
975         }
976         pthread_mutex_unlock(&handle->call_status_mutex);
977
978         pthread_mutex_lock(&handle->call_status_mutex);
979         handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
980         pthread_mutex_unlock(&handle->call_status_mutex);
981
982         handle->call_async_callback = callback;
983         handle->call_async_userdata = userdata;
984
985         // add a callback for GIOChannel
986         if (!handle->async_channel) {
987                 handle->async_channel = g_io_channel_unix_new(handle->fd);
988                 if (!handle->async_channel) {
989                         ERROR("g_io_channel_unix_new error");
990                         return -1;
991                 }
992         }
993
994         source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle);
995         handle->async_source_id = source_id;
996
997         if (__pims_ipc_send(handle, module, function, data_in) != 0) {
998                 g_source_remove(source_id);
999                 return -1;
1000         }
1001
1002         __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
1003
1004         return 0;
1005 }
1006
1007 API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc)
1008 {
1009         int ret;
1010         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1011
1012         if (ipc == NULL) {
1013                 ERROR("invalid handle : %p", ipc);
1014                 return false;
1015         }
1016
1017         pthread_mutex_lock(&handle->call_status_mutex);
1018         if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
1019                 ret = true;
1020         else
1021                 ret = false;
1022         pthread_mutex_unlock(&handle->call_status_mutex);
1023         return ret;
1024 }
1025
1026 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata)
1027 {
1028         gchar *call_id = NULL;
1029         pims_ipc_cb_s *cb_data = NULL;
1030         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1031
1032         if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1033                 ERROR("invalid handle : %p", ipc);
1034                 return -1;
1035         }
1036
1037         if (!module || !event || !callback) {
1038                 ERROR("invalid argument");
1039                 return -1;
1040         }
1041
1042         cb_data = g_new0(pims_ipc_cb_s, 1);
1043         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1044
1045         VERBOSE("subscribe cb id[%s]", call_id);
1046         cb_data->callback = callback;
1047         cb_data->user_data = userdata;
1048         g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1049
1050         return 0;
1051 }
1052
1053 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1054 {
1055         gchar *call_id = NULL;
1056         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1057
1058         if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1059                 ERROR("invalid handle : %p", ipc);
1060                 return -1;
1061         }
1062
1063         if (!module || !event) {
1064                 ERROR("invalid argument");
1065                 return -1;
1066         }
1067
1068         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1069
1070         VERBOSE("unsubscribe cb id[%s]", call_id);
1071
1072         if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1073                 ERROR("g_hash_table_remove error");
1074                 g_free(call_id);
1075                 return -1;
1076         }
1077
1078         g_free(call_id);
1079         return 0;
1080 }
1081
1082 API int pims_ipc_unset_server_disconnected_cb()
1083 {
1084         pthread_mutex_lock(&__disconnect_cb_mutex);
1085         _server_disconnected_cb.callback = NULL;
1086         _server_disconnected_cb.user_data = NULL;
1087         pthread_mutex_unlock(&__disconnect_cb_mutex);
1088         return 0;
1089 }
1090
1091 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback, void *user_data)
1092 {
1093         pthread_mutex_lock(&__disconnect_cb_mutex);
1094         _server_disconnected_cb.callback = callback;
1095         _server_disconnected_cb.user_data = user_data;
1096         pthread_mutex_unlock(&__disconnect_cb_mutex);
1097         return 0;
1098 }