Increased wait time and error handling
[platform/core/pim/pims-ipc.git] / src / pims-ipc.c
1 /*
2  * PIMS IPC
3  *
4  * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the License);
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an AS IS BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include <unistd.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <time.h>
24 #include <glib.h>
25 #include <stdint.h>
26 #include <pthread.h>
27 #include <poll.h>       /* pollfds */
28 #include <sys/un.h>     /* sockaddr_un */
29 #include <sys/ioctl.h>  /* ioctl */
30 #include <sys/socket.h> /* socket */
31 #include <sys/types.h>
32 #include <sys/epoll.h>  /* epoll */
33 #include <sys/eventfd.h> /* eventfd */
34 #include <fcntl.h>
35 #include <errno.h>
36
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-ipc-data.h"
40 #include "pims-ipc-data-internal.h"
41 #include "pims-ipc.h"
42
43 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
44         sequence_no = ++((handle)->call_sequence_no);\
45 } while (0)
46
47 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
48
49 typedef enum {
50         PIMS_IPC_CALL_STATUS_READY = 0,
51         PIMS_IPC_CALL_STATUS_IN_PROGRESS
52 } pims_ipc_call_status_e;
53
54 typedef enum {
55         PIMS_IPC_MODE_REQ = 0,
56         PIMS_IPC_MODE_SUB
57 } pims_ipc_mode_e;
58
59 typedef struct {
60         pims_ipc_subscribe_cb callback;
61         void * user_data;
62 } pims_ipc_cb_s;
63
64 typedef struct {
65         char *call_id;
66         pims_ipc_data_h *handle;
67 } pims_ipc_subscribe_data_s;
68
69 typedef struct {
70         int fd;
71         char *service;
72         char *id;
73         GIOChannel *async_channel;
74         guint disconnected_source;
75         guint async_source_id;
76         pthread_mutex_t call_status_mutex;
77         pims_ipc_call_status_e call_status;
78         unsigned int call_sequence_no;
79         pims_ipc_call_async_cb call_async_callback;
80         void *call_async_userdata;
81         pims_ipc_data_h dhandle_for_async_idler;
82
83         int subscribe_fd;
84         int epoll_stop_thread;
85         pthread_t io_thread;
86         GHashTable *subscribe_cb_table;
87
88         pthread_mutex_t data_queue_mutex;
89         GList *data_queue;
90
91         pthread_mutex_t call_mutex; /* not to be interrupted while sending and receiving */
92         int eventfd;
93 } pims_ipc_s;
94
95 static unsigned int ref_cnt;
96 static GList *subscribe_handles;
97 static GList *disconnected_list;
98
99 typedef struct {
100         pims_ipc_server_disconnected_cb callback;
101         void *user_data;
102         pims_ipc_s *handle;
103 } pims_ipc_server_disconnected_cb_t;
104
105 /* start deprecated */
106 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
107 /* end deprecated */
108 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
109
110 static void __sub_data_free(gpointer user_data)
111 {
112         pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
113         pims_ipc_data_destroy(data->handle);
114         free(data->call_id);
115         free(data);
116 }
117
118 static void __pims_ipc_free_handle(pims_ipc_s *handle)
119 {
120         pthread_mutex_lock(&__gmutex);
121
122         handle->epoll_stop_thread = TRUE;
123         if (handle->eventfd != -1)
124                 if (eventfd_write(handle->eventfd, 1) != 0)
125                         VERBOSE("Failed to write the event \n ");
126
127         if (handle->fd != -1)
128                 close(handle->fd);
129
130         pthread_mutex_unlock(&__gmutex);
131         if (handle->io_thread)
132                 pthread_join(handle->io_thread, NULL);
133         pthread_mutex_lock(&__gmutex);
134
135         g_free(handle->id);
136         g_free(handle->service);
137
138         if (handle->async_channel) {
139                 /* remove a subscriber handle from the golbal list */
140                 subscribe_handles = g_list_remove(subscribe_handles, handle);
141                 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
142
143                 g_source_remove(handle->async_source_id);
144                 g_io_channel_unref(handle->async_channel);
145         }
146
147         if (handle->subscribe_cb_table)
148                 g_hash_table_destroy(handle->subscribe_cb_table);
149
150         pthread_mutex_lock(&handle->data_queue_mutex);
151         if (handle->data_queue)
152                 g_list_free_full(handle->data_queue, __sub_data_free);
153
154         pthread_mutex_unlock(&handle->data_queue_mutex);
155         pthread_mutex_destroy(&handle->data_queue_mutex);
156
157         if (handle->subscribe_fd != -1)
158                 close(handle->subscribe_fd);
159
160         if (0 < handle->disconnected_source)
161                 g_source_remove(handle->disconnected_source);
162
163         pthread_mutex_destroy(&handle->call_mutex);
164         pthread_mutex_destroy(&handle->call_status_mutex);
165
166         g_free(handle);
167
168         if (--ref_cnt <= 0) {
169                 if (subscribe_handles)
170                         g_list_free(subscribe_handles);
171                 subscribe_handles = NULL;
172         }
173
174         pthread_mutex_unlock(&__gmutex);
175 }
176
177 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
178 {
179         pims_ipc_cb_s *cb_data = NULL;
180         uint64_t dummy;
181
182         do {
183                 read_command(handle->subscribe_fd, &dummy);
184
185                 pthread_mutex_lock(&handle->data_queue_mutex);
186                 if (!handle->data_queue) {
187                         pthread_mutex_unlock(&handle->data_queue_mutex);
188                         break;
189                 }
190
191                 GList *cursor = g_list_first(handle->data_queue);
192                 if (NULL == cursor || NULL == cursor->data) {
193                         pthread_mutex_unlock(&handle->data_queue_mutex);
194                         break;
195                 }
196                 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
197
198                 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
199                 if (cb_data == NULL)
200                         VERBOSE("unable to find %s", data->call_id);
201                 else
202                         cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
203
204                 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
205                 __sub_data_free(data);
206                 pthread_mutex_unlock(&handle->data_queue_mutex);
207         } while (1);
208
209         return 0;
210 }
211
212 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
213 {
214         pims_ipc_s *handle = (pims_ipc_s *)data;
215
216         if (condition & G_IO_HUP)
217                 return FALSE;
218
219         pthread_mutex_lock(&__gmutex);
220
221         /* check if a subscriber handle is exists */
222         if (g_list_find(subscribe_handles, handle) == NULL) {
223                 ERR("No such handle that ID is %p", handle);
224                 pthread_mutex_unlock(&__gmutex);
225                 return FALSE;
226         }
227
228         __pims_ipc_receive_for_subscribe(handle);
229
230         pthread_mutex_unlock(&__gmutex);
231
232         return TRUE;
233 }
234
235 static unsigned int __get_global_sequence_no()
236 {
237         static unsigned int __gsequence_no = 0xffffffff;
238
239         if (__gsequence_no == 0xffffffff)
240                 __gsequence_no = (unsigned int)time(NULL);
241         else
242                 __gsequence_no++;
243         return __gsequence_no;
244 }
245
246 static int __pims_ipc_send_identify(pims_ipc_s *handle)
247 {
248         unsigned int total_len, seq_no;
249         unsigned int client_id_len = strlen(handle->id);
250
251         total_len = sizeof(total_len) + sizeof(client_id_len)+client_id_len + sizeof(seq_no);
252
253         int length = 0;
254         char buf[total_len+1];
255         memset(buf, 0x0, total_len+1);
256
257         memcpy(buf, &total_len, sizeof(total_len));
258         length += sizeof(total_len);
259
260         memcpy(buf+length, &(client_id_len), sizeof(client_id_len));
261         length += sizeof(client_id_len);
262         memcpy(buf+length, handle->id, client_id_len);
263         length += client_id_len;
264
265         GET_CALL_SEQUNECE_NO(handle, seq_no);
266         memcpy(buf+length, &(seq_no), sizeof(seq_no));
267         length += sizeof(seq_no);
268
269         return socket_send(handle->fd, buf, length);
270 }
271
272 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
273 {
274         int ret;
275         gboolean is_ok = FALSE;
276         int len = 0;
277         pims_ipc_data_h data = NULL;
278         unsigned int seq_no = 0;
279         char *client_id = NULL;
280         char *call_id = NULL;
281         char *buf = NULL;
282
283         /* read the size of message. note that ioctl is non-blocking */
284         if (ioctl(handle->fd, FIONREAD, &len)) {
285                 ERR("ioctl failed: %d", errno);
286                 return -1;
287         }
288
289         /* when server or client closed socket */
290         if (len == 0) {
291                 ERR("[IPC Socket] connection is closed");
292                 return -1;
293         }
294
295         do {
296                 unsigned int read_len = 0;
297                 unsigned int total_len = 0;
298                 unsigned int client_id_len = 0;
299                 unsigned int call_id_len = 0;
300                 unsigned int has_data = FALSE;
301
302                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len)));
303
304                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &(client_id_len), sizeof(client_id_len)));
305                 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
306                         client_id = calloc(1, client_id_len+1);
307                         if (client_id == NULL) {
308                                 ERR("calloc fail");
309                                 break;
310                         }
311                 } else
312                         break;
313                 ret = socket_recv(handle->fd, (void *)&client_id, client_id_len);
314                 if (ret < 0) {  ERR("socket_recv error"); break;        }
315                 read_len += ret;
316
317                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &seq_no, sizeof(seq_no)));
318                 if (total_len == read_len) {
319                         /* send identity */
320                         data = pims_ipc_data_create(0);
321                         if (NULL == data) {
322                                 ERR("pims_ipc_data_create() Fail");
323                                 break;
324                         }
325                         ret = pims_ipc_data_put(data, client_id, client_id_len);
326                         if (ret != 0)
327                                 WARN("pims_ipc_data_put fail(%d)", ret);
328                         break;
329                 }
330
331                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len)));
332                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
333                         call_id = calloc(1, call_id_len+1);
334                         if (call_id == NULL) {
335                                 ERR("calloc fail");
336                                 break;
337                         }
338                 } else
339                         break;
340
341                 ret = socket_recv(handle->fd, (void *)&call_id, call_id_len);
342                 if (ret < 0) {  ERR("socket_recv error"); break;        }
343                 read_len += ret;
344
345                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data)));
346                 if (has_data) {
347                         unsigned int data_len;
348                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len)));
349                         if (data_len > 0 && data_len < UINT_MAX-1) {
350                                 buf = calloc(1, data_len+1);
351                                 if (buf == NULL) {
352                                         ERR("calloc fail");
353                                         break;
354                                 }
355                         } else
356                                 break;
357                         ret = socket_recv(handle->fd, (void *)&buf, data_len);
358                         if (ret < 0) {  ERR("socket_recv error"); break;        }
359                         read_len += ret;
360
361                         data = pims_ipc_data_steal_unmarshal(buf, data_len);
362                         if (NULL == data) {
363                                 ERR("pims_ipc_data_steal_unmarshal() Fail");
364                                 break;
365                         }
366                 }
367
368                 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, seq_no);
369         } while (0);
370         free(client_id);
371         free(call_id);
372         free(buf);
373
374         if (seq_no == handle->call_sequence_no) {
375                 if (data_out != NULL)
376                         *data_out = data;
377                 else if (data)
378                         pims_ipc_data_destroy(data);
379                 is_ok = TRUE;
380         } else {
381                 if (data)
382                         pims_ipc_data_destroy(data);
383                 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, seq_no);
384         }
385
386         if (is_ok)
387                 return 0;
388
389         return -1;
390 }
391
392 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
393 {
394         int ret = -1;
395         struct pollfd pollfds[1];
396
397         pollfds[0].fd = handle->fd;
398         pollfds[0].events = POLLIN | POLLERR | POLLHUP;
399         pollfds[0].revents = 0;
400
401         while (1) {
402                 while (1) {
403                         ret = poll(pollfds, 1, 4500);
404                         if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
405                                 continue;
406
407                         break;
408                 }
409
410                 if (ret == 0){
411                         ERR("timedout errno[%d], revents[%d]", errno, pollfds[0].revents);
412                         return -1;
413                 }
414                 if (ret > 0) {
415                         if (pollfds[0].revents & (POLLERR|POLLHUP)) {
416                                 ERR("Server disconnected ret[%d]", ret);
417                                 ret = -1;
418                                 break;
419                         }
420                         if (pollfds[0].revents & POLLIN) {
421                                 ret = __pims_ipc_read_data(handle, data_out);
422                                 break;
423                         }
424                 }
425         }
426
427         return ret;
428 }
429
430 static int __open_subscribe_fd(pims_ipc_s *handle)
431 {
432         int ret;
433         int flags;
434
435         int subscribe_fd = eventfd(0, 0);
436         if (-1 == subscribe_fd) {
437                 ERR("eventfd error : %d", errno);
438                 return -1;
439         }
440         VERBOSE("subscribe :%d\n", subscribe_fd);
441
442         flags = fcntl(subscribe_fd, F_GETFL, 0);
443         if (flags == -1)
444                 flags = 0;
445
446         ret = fcntl(subscribe_fd, F_SETFL, flags | O_NONBLOCK);
447         if (0 != ret)
448                 ERR("fcntl() Fail(%d)", errno);
449
450         handle->subscribe_fd = subscribe_fd;
451         return 0;
452 }
453
454 static int __subscribe_data(pims_ipc_s * handle)
455 {
456         int len;
457         int ret = -1;
458         char *call_id = NULL;
459         char *buf = NULL;
460         pims_ipc_data_h dhandle = NULL;
461
462         do {
463                 /* read the size of message. note that ioctl is non-blocking */
464                 if (ioctl(handle->fd, FIONREAD, &len)) {
465                         ERR("ioctl failed: %d", errno);
466                         break;
467                 }
468
469                 /* when server or client closed socket */
470                 if (len == 0) {
471                         INFO("[IPC Socket] connection is closed");
472                         ret = -1;
473                         break;
474                 }
475
476                 unsigned int read_len = 0;
477                 unsigned int total_len = 0;
478                 unsigned int call_id_len = 0;
479                 unsigned int has_data = FALSE;
480
481                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len)));
482
483                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len)));
484                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
485                         call_id = calloc(1, call_id_len+1);
486                         if (call_id == NULL) {
487                                 ERR("calloc fail");
488                                 break;
489                         }
490                 } else
491                         break;
492
493                 ret = socket_recv(handle->fd, (void *)&call_id, call_id_len);
494                 if (ret < 0) {  ERR("socket_recv error"); break; }
495                 read_len += ret;
496
497                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data)));
498
499                 if (has_data) {
500                         unsigned int data_len;
501                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len)));
502                         if (data_len > 0 && data_len < UINT_MAX-1) {
503                                 buf = calloc(1, data_len+1);
504                                 if (buf == NULL) {
505                                         ERR("calloc fail");
506                                         break;
507                                 }
508                         } else
509                                 break;
510                         ret = socket_recv(handle->fd, (void *)&buf, data_len);
511                         if (ret < 0) {
512                                 ERR("socket_recv error(%d)", ret);
513                                 break;
514                         }
515                         read_len += ret;
516
517                         dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
518                         if (NULL == dhandle) {
519                                 ERR("pims_ipc_data_steal_unmarshal() Fail");
520                                 break;
521                         }
522
523                         pims_ipc_subscribe_data_s *sub_data;
524                         sub_data = calloc(1, sizeof(pims_ipc_subscribe_data_s));
525                         if (NULL == sub_data) {
526                                 ERR("calloc() Fail");
527                                 pims_ipc_data_destroy(dhandle);
528                                 ret = -1;
529                                 break;
530                         }
531                         sub_data->handle = dhandle;
532                         sub_data->call_id = call_id;
533                         call_id = NULL;
534
535                         pthread_mutex_lock(&handle->data_queue_mutex);
536                         handle->data_queue = g_list_append(handle->data_queue, sub_data);
537                         pthread_mutex_unlock(&handle->data_queue_mutex);
538                         write_command(handle->subscribe_fd, 1);
539                 }
540                 ret = 0;
541         } while (0);
542
543         free(call_id);
544         free(buf);
545         return ret;
546 }
547
548 static gboolean __hung_up_cb(gpointer data)
549 {
550         GList *cursor = NULL;
551
552         if (NULL == disconnected_list) {
553                 DBG("No disconnected list");
554                 return FALSE;
555         }
556
557         pthread_mutex_lock(&__disconnect_cb_mutex);
558         cursor = g_list_first(disconnected_list);
559         while (cursor) {
560                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
561                 if (disconnected && disconnected->handle == data && disconnected->callback) {
562                         DBG("call hung_up callback");
563                         disconnected->callback(disconnected->user_data);
564                         break;
565                 }
566                 cursor = g_list_next(cursor);
567         }
568         pthread_mutex_unlock(&__disconnect_cb_mutex);
569
570         return FALSE;
571 }
572
573 static void* __io_thread(void *data)
574 {
575         pims_ipc_s *handle = data;
576         struct epoll_event ev = {0};
577         int ret;
578         int epfd;
579
580         epfd = epoll_create(MAX_EPOLL_EVENT);
581
582         pthread_mutex_lock(&__gmutex);
583
584         handle->eventfd = eventfd(0, EFD_NONBLOCK);
585         ev.data.fd = handle->eventfd;
586         ev.events = EPOLLIN;
587         ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->eventfd, &ev);
588         WARN_IF(ret != 0, "listen error :%d", ret);
589
590         ev.events = EPOLLIN | EPOLLHUP;
591         ev.data.fd = handle->fd;
592         ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
593         WARN_IF(ret != 0, "listen error :%d", ret);
594         pthread_mutex_unlock(&__gmutex);
595
596
597         while (1) {
598                 int i = 0;
599
600                 pthread_mutex_lock(&__gmutex);
601                 if (handle->epoll_stop_thread) {
602                         pthread_mutex_unlock(&__gmutex);
603                         break;
604                 }
605                 pthread_mutex_unlock(&__gmutex);
606
607                 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
608                 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, -1);
609
610                 pthread_mutex_lock(&__gmutex);
611
612                 if (handle->epoll_stop_thread) {
613                         DBG("__io_thread epoll_stop_thread");
614                         pthread_mutex_unlock(&__gmutex);
615                         break;
616                 }
617                 pthread_mutex_unlock(&__gmutex);
618
619                 if (event_num == -1) {
620                         if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
621                                 ERR("errno:%d\n", errno);
622                                 break;
623                         }
624                 }
625
626                 pthread_mutex_lock(&__gmutex);
627                 for (i = 0; i < event_num; i++) {
628                         if (events[i].events & EPOLLHUP) {
629                                 ERR("server fd closed");
630                                 handle->epoll_stop_thread = TRUE;
631                                 break;
632                         }
633
634                         if (events[i].events & EPOLLIN) {
635                                 if (__subscribe_data(handle) < 0) {
636                                         ERR("server fd closed");
637                                         g_idle_add(__hung_up_cb, handle);
638                                         handle->epoll_stop_thread = TRUE;
639                                         break;
640                                 }
641                         }
642                 }
643                 pthread_mutex_unlock(&__gmutex);
644         }
645
646         close(epfd);
647         close(handle->eventfd);
648
649         pthread_exit(NULL);
650 }
651
652 static gboolean _g_io_hup_cb(GIOChannel *src, GIOCondition condition, gpointer data)
653 {
654         if (G_IO_HUP & condition) {
655                 DBG("hung up");
656                 __hung_up_cb(data);
657                 return FALSE;
658
659         } else if (G_IO_IN & condition) {
660                 char buf[1] = {0};
661                 if (0 == recv(((pims_ipc_s *)data)->fd, buf, sizeof(buf), MSG_PEEK)) {
662                         DBG("hung up");
663                         __hung_up_cb(data);
664                         return FALSE;
665                 }
666         } else {
667                 ERR("Invalid condition (%d)", condition);
668         }
669         return TRUE;
670 }
671
672 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
673 {
674         pims_ipc_s *handle = NULL;
675         gboolean is_ok = FALSE;
676
677         pthread_mutex_lock(&__gmutex);
678
679         do {
680                 struct sockaddr_un server_addr;
681                 int ret;
682
683                 ref_cnt++;
684                 VERBOSE("Create %d th..", ref_cnt);
685
686                 handle = g_new0(pims_ipc_s, 1);
687                 if (handle == NULL) {
688                         ERR("Failed to allocation");
689                         break;
690                 }
691
692                 handle->subscribe_fd = -1;
693                 handle->io_thread = 0;
694                 handle->eventfd = -1;
695                 handle->service = g_strdup(service);
696                 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
697                 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
698                 if (handle->fd < 0) {
699                         ERR("socket error : %d, errno: %d", handle->fd, errno);
700                         break;
701                 }
702                 int flags = fcntl(handle->fd, F_GETFL, 0);
703                 if (flags == -1)
704                         flags = 0;
705                 ret = fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK);
706                 if (0 != ret)
707                         ERR("fcntl() Fail(%d)", errno);
708
709                 pthread_mutex_init(&handle->call_status_mutex, 0);
710
711                 pthread_mutex_lock(&handle->call_status_mutex);
712                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
713                 pthread_mutex_unlock(&handle->call_status_mutex);
714
715                 pthread_mutex_init(&handle->call_mutex, 0);
716
717                 bzero(&server_addr, sizeof(server_addr));
718                 server_addr.sun_family = AF_UNIX;
719                 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
720
721                 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
722                 if (ret != 0) {
723                         ERR("connect error : %d, errno: %d", ret, errno);
724                         break;
725                 }
726                 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
727                 DBG("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
728
729                 if (mode == PIMS_IPC_MODE_REQ) {
730                         GIOChannel *ch = g_io_channel_unix_new(handle->fd);
731                         handle->disconnected_source = g_io_add_watch(ch, G_IO_IN|G_IO_HUP,
732                                         _g_io_hup_cb, handle);
733                         g_io_channel_unref(ch);
734
735                         handle->call_sequence_no = (unsigned int)time(NULL);
736                         ret = __pims_ipc_send_identify(handle);
737                         if (ret < 0) {
738                                 ERR("__pims_ipc_send_identify error");
739                                 break;
740                         }
741                         ret = __pims_ipc_receive(handle, NULL);
742                         if (ret < 0) {
743                                 ERR("__pims_ipc_receive error");
744                                 break;
745                         }
746
747                         if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0)
748                                 WARN("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
749
750                 } else {
751                         handle->epoll_stop_thread = FALSE;
752                         pthread_mutex_init(&handle->data_queue_mutex, 0);
753
754                         pthread_mutex_lock(&handle->data_queue_mutex);
755                         handle->data_queue = NULL;
756                         pthread_mutex_unlock(&handle->data_queue_mutex);
757
758                         ret = __open_subscribe_fd(handle);
759                         if (ret < 0)
760                                 break;
761
762                         pthread_t worker;
763                         ret = pthread_create(&worker, NULL, __io_thread, handle);
764                         if (ret != 0)
765                                 break;
766                         handle->io_thread  = worker;
767
768                         GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
769                         if (!async_channel) {
770                                 ERR("g_io_channel_unix_new error");
771                                 break;
772                         }
773                         handle->async_channel = async_channel;
774                         handle->async_source_id = g_io_add_watch(handle->async_channel,
775                                         G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
776                         handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal,
777                                         g_free, g_free);
778                         ASSERT(handle->subscribe_cb_table);
779
780                         /* add a subscriber handle to the global list */
781                         subscribe_handles = g_list_append(subscribe_handles, handle);
782                         VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
783                 }
784
785                 is_ok = TRUE;
786                 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
787         } while (0);
788
789         pthread_mutex_unlock(&__gmutex);
790
791         if (FALSE == is_ok) {
792                 if (handle) {
793                         __pims_ipc_free_handle(handle);
794                         handle = NULL;
795                 }
796         }
797
798         return handle;
799 }
800
801 API pims_ipc_h pims_ipc_create(char *service)
802 {
803         return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
804 }
805
806 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
807 {
808         return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
809 }
810
811 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
812 {
813         pims_ipc_s *handle = ipc;
814
815         if (mode == PIMS_IPC_MODE_REQ) {
816                 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY,
817                                 NULL, NULL) != 0) {
818                         WARN("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
819                 }
820         }
821
822         if (handle)
823                 __pims_ipc_free_handle(handle);
824 }
825
826 API void pims_ipc_destroy(pims_ipc_h ipc)
827 {
828         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
829 }
830
831 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
832 {
833         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
834 }
835
836 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
837 {
838         int ret = -1;
839         int length = 0;
840         unsigned int total_len;
841         unsigned int seq_no = 0;
842         gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
843         unsigned int call_id_len = strlen(call_id);
844         pims_ipc_data_s *data = NULL;
845         unsigned int has_data = FALSE;
846         unsigned int client_id_len = strlen(handle->id);
847
848         GET_CALL_SEQUNECE_NO(handle, seq_no);
849
850         int len = sizeof(total_len)     + sizeof(client_id_len) + client_id_len + sizeof(seq_no)
851                 + call_id_len + sizeof(call_id_len) + sizeof(has_data);
852         total_len = len;
853
854         if (data_in) {
855                 has_data = TRUE;
856                 data = data_in;
857                 len += sizeof(unsigned int);
858                 total_len = len + data->buf_size;
859         }
860
861         INFO("len(%d),client_id(%s),call_id(%s),seq_no(%d)", len, handle->id, call_id, seq_no);
862
863         char buf[len+1];
864         memset(buf, 0x0, len+1);
865
866         memcpy(buf, &total_len, sizeof(total_len));
867         length += sizeof(total_len);
868
869         client_id_len = strlen(handle->id);
870         memcpy(buf+length, &client_id_len, sizeof(client_id_len));
871         length += sizeof(client_id_len);
872         memcpy(buf+length, handle->id, client_id_len);
873         length += client_id_len;
874
875         memcpy(buf+length, &seq_no, sizeof(seq_no));
876         length += sizeof(seq_no);
877
878         memcpy(buf+length, &call_id_len, sizeof(call_id_len));
879         length += sizeof(call_id_len);
880         memcpy(buf+length, call_id, call_id_len);
881         length += call_id_len;
882         g_free(call_id);
883
884         memcpy(buf+length, &has_data, sizeof(has_data));
885         length += sizeof(has_data);
886
887         if (has_data) {
888                 memcpy(buf+length, &(data->buf_size), sizeof(data->buf_size));
889                 length += sizeof(data->buf_size);
890
891                 ret = socket_send(handle->fd, buf, length);
892                 if (ret > 0)
893                         ret = socket_send_data(handle->fd, data->buf, data->buf_size);
894         } else {
895                 ret = socket_send(handle->fd, buf, length);
896         }
897
898         if (ret < 0)
899                 return -1;
900
901         return 0;
902 }
903
904 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
905                 pims_ipc_data_h *data_out)
906 {
907         pims_ipc_s *handle = ipc;
908
909         RETV_IF(NULL == ipc, -1);
910         RETV_IF(NULL == module, -1);
911         RETV_IF(NULL == function, -1);
912
913         pthread_mutex_lock(&handle->call_status_mutex);
914         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
915                 pthread_mutex_unlock(&handle->call_status_mutex);
916                 ERR("the previous call is in progress : %p", ipc);
917                 return -1;
918         }
919         pthread_mutex_unlock(&handle->call_status_mutex);
920
921         int ret = 0;
922         pthread_mutex_lock(&handle->call_mutex);
923         do {
924                 ret = __pims_ipc_send(handle, module, function, data_in);
925                 if (0 != ret)
926                         break;
927
928                 ret = __pims_ipc_receive(handle, data_out);
929                 if (0 != ret)
930                         break;
931         } while (0);
932         pthread_mutex_unlock(&handle->call_mutex);
933
934         return ret;
935 }
936
937 static gboolean __call_async_idler_cb(gpointer data)
938 {
939         pims_ipc_s *handle = data;
940         pims_ipc_data_h dhandle;
941
942         RETV_IF(NULL == handle, FALSE);
943
944         dhandle = handle->dhandle_for_async_idler;
945         handle->dhandle_for_async_idler = NULL;
946
947         pthread_mutex_lock(&handle->call_status_mutex);
948         handle->call_status = PIMS_IPC_CALL_STATUS_READY;
949         pthread_mutex_unlock(&handle->call_status_mutex);
950
951         handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
952         pims_ipc_data_destroy(dhandle);
953
954         return FALSE;
955 }
956
957 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition,
958                 gpointer data)
959 {
960         pims_ipc_s *handle = data;
961         pims_ipc_data_h dhandle = NULL;
962
963         if (__pims_ipc_receive(handle, &dhandle) == 0) {
964                 VERBOSE("call status = %d", handle->call_status);
965
966                 pthread_mutex_lock(&handle->call_status_mutex);
967                 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
968                         pthread_mutex_unlock(&handle->call_status_mutex);
969                         pims_ipc_data_destroy(dhandle);
970                 } else {
971                         pthread_mutex_unlock(&handle->call_status_mutex);
972                         if (src == NULL) {    /* A response is arrived too quickly */
973                                 handle->dhandle_for_async_idler = dhandle;
974                                 g_idle_add(__call_async_idler_cb, handle);
975                         } else {
976                                 pthread_mutex_lock(&handle->call_status_mutex);
977                                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
978                                 pthread_mutex_unlock(&handle->call_status_mutex);
979
980                                 handle->call_async_callback((pims_ipc_h)handle, dhandle,
981                                                 handle->call_async_userdata);
982                                 pims_ipc_data_destroy(dhandle);
983                         }
984                 }
985         }
986         return FALSE;
987 }
988
989 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function,
990                 pims_ipc_data_h data_in, pims_ipc_call_async_cb callback, void *user_data)
991 {
992         pims_ipc_s *handle = ipc;
993         guint source_id = 0;
994
995         RETV_IF(NULL == ipc, -1);
996         RETV_IF(NULL == module, -1);
997         RETV_IF(NULL == function, -1);
998         RETV_IF(NULL == callback, -1);
999
1000         pthread_mutex_lock(&handle->call_status_mutex);
1001         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
1002                 pthread_mutex_unlock(&handle->call_status_mutex);
1003                 ERR("the previous call is in progress : %p", ipc);
1004                 return -1;
1005         }
1006         pthread_mutex_unlock(&handle->call_status_mutex);
1007
1008         pthread_mutex_lock(&handle->call_status_mutex);
1009         handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
1010         pthread_mutex_unlock(&handle->call_status_mutex);
1011
1012         handle->call_async_callback = callback;
1013         handle->call_async_userdata = user_data;
1014
1015         /* add a callback for GIOChannel */
1016         if (!handle->async_channel) {
1017                 handle->async_channel = g_io_channel_unix_new(handle->fd);
1018                 if (!handle->async_channel) {
1019                         ERR("g_io_channel_unix_new error");
1020                         return -1;
1021                 }
1022         }
1023
1024         source_id = g_io_add_watch(handle->async_channel, G_IO_IN,
1025                         __pims_ipc_call_async_handler, handle);
1026         handle->async_source_id = source_id;
1027
1028         if (__pims_ipc_send(handle, module, function, data_in) != 0) {
1029                 g_source_remove(source_id);
1030                 return -1;
1031         }
1032
1033         __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
1034
1035         return 0;
1036 }
1037
1038 API int pims_ipc_is_call_in_progress(pims_ipc_h ipc)
1039 {
1040         int ret;
1041         pims_ipc_s *handle = ipc;
1042
1043         RETV_IF(NULL == ipc, FALSE);
1044
1045         pthread_mutex_lock(&handle->call_status_mutex);
1046         if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
1047                 ret = TRUE;
1048         else
1049                 ret = FALSE;
1050         pthread_mutex_unlock(&handle->call_status_mutex);
1051         return ret;
1052 }
1053
1054 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event,
1055                 pims_ipc_subscribe_cb callback, void *user_data)
1056 {
1057         gchar *call_id = NULL;
1058         pims_ipc_s *handle = ipc;
1059         pims_ipc_cb_s *cb_data = NULL;
1060
1061         RETV_IF(NULL == ipc, -1);
1062         RETV_IF(NULL == module, -1);
1063         RETV_IF(NULL == event, -1);
1064         RETV_IF(NULL == callback, -1);
1065         RETV_IF(NULL == handle->subscribe_cb_table, -1);
1066
1067         cb_data = g_new0(pims_ipc_cb_s, 1);
1068         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1069
1070         VERBOSE("subscribe cb id[%s]", call_id);
1071         cb_data->callback = callback;
1072         cb_data->user_data = user_data;
1073         g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1074
1075         return 0;
1076 }
1077
1078 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1079 {
1080         gchar *call_id = NULL;
1081         pims_ipc_s *handle = ipc;
1082
1083         RETV_IF(NULL == ipc, -1);
1084         RETV_IF(NULL == module, -1);
1085         RETV_IF(NULL == event, -1);
1086         RETV_IF(NULL == handle->subscribe_cb_table, -1);
1087
1088         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1089
1090         VERBOSE("unsubscribe cb id[%s]", call_id);
1091
1092         if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1093                 ERR("g_hash_table_remove error");
1094                 g_free(call_id);
1095                 return -1;
1096         }
1097
1098         g_free(call_id);
1099         return 0;
1100 }
1101
1102 API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle,
1103                 pims_ipc_server_disconnected_cb callback, void *user_data)
1104 {
1105         GList *cursor = NULL;
1106
1107         /* check already existed */
1108         pthread_mutex_lock(&__disconnect_cb_mutex);
1109         cursor = g_list_first(disconnected_list);
1110         while (cursor) {
1111                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1112                 if (disconnected && disconnected->handle == handle) {
1113                         ERR("Already set callback");
1114                         pthread_mutex_unlock(&__disconnect_cb_mutex);
1115                         return -1;
1116                 }
1117                 cursor = g_list_next(cursor);
1118         }
1119         pthread_mutex_unlock(&__disconnect_cb_mutex);
1120
1121         /* append callback */
1122         pims_ipc_server_disconnected_cb_t *disconnected = NULL;
1123         disconnected = calloc(1, sizeof(pims_ipc_server_disconnected_cb_t));
1124         if (NULL == disconnected) {
1125                 ERR("calloc() Fail");
1126                 return -1;
1127         }
1128         DBG("add disconnected");
1129         disconnected->handle = handle;
1130         disconnected->callback = callback;
1131         disconnected->user_data = user_data;
1132
1133         pthread_mutex_lock(&__disconnect_cb_mutex);
1134         disconnected_list = g_list_append(disconnected_list, disconnected);
1135         pthread_mutex_unlock(&__disconnect_cb_mutex);
1136
1137         return 0;
1138 }
1139
1140 API int pims_ipc_remove_server_disconnected_cb(pims_ipc_h handle)
1141 {
1142         pthread_mutex_lock(&__disconnect_cb_mutex);
1143
1144         GList *cursor = NULL;
1145         cursor = g_list_first(disconnected_list);
1146         while (cursor) {
1147                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1148                 if (disconnected && disconnected->handle == handle) {
1149                         free(disconnected);
1150                         disconnected_list = g_list_delete_link(disconnected_list, cursor);
1151                         DBG("remove disconnected_cb");
1152                         break;
1153                 }
1154                 cursor = g_list_next(cursor);
1155         }
1156         pthread_mutex_unlock(&__disconnect_cb_mutex);
1157
1158         return 0;
1159 }
1160
1161 /* start deprecated */
1162 API int pims_ipc_unset_server_disconnected_cb()
1163 {
1164         pthread_mutex_lock(&__disconnect_cb_mutex);
1165         _server_disconnected_cb.callback = NULL;
1166         _server_disconnected_cb.user_data = NULL;
1167         pthread_mutex_unlock(&__disconnect_cb_mutex);
1168         return 0;
1169 }
1170
1171 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback,
1172                 void *user_data)
1173 {
1174         pthread_mutex_lock(&__disconnect_cb_mutex);
1175         _server_disconnected_cb.callback = callback;
1176         _server_disconnected_cb.user_data = user_data;
1177         pthread_mutex_unlock(&__disconnect_cb_mutex);
1178         return 0;
1179 }
1180 /* end deprecated */