5e6d31ffc6005a8fa1a5340bf6e1f6c9ae7d7726
[platform/core/pim/pims-ipc.git] / src / pims-ipc.c
1 /*
2  * PIMS IPC
3  *
4  * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the License);
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an AS IS BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include <unistd.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <time.h>
24 #include <glib.h>
25 #include <stdint.h>
26 #include <pthread.h>
27 #include <poll.h> // pollfds
28 #include <sys/un.h> // sockaddr_un
29 #include <sys/ioctl.h> // ioctl
30 #include <sys/socket.h> //socket
31 #include <sys/types.h>
32 #include <sys/epoll.h> // epoll
33 #include <sys/eventfd.h> // eventfd
34 #include <fcntl.h>
35 #include <errno.h>
36
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-debug.h"
40 #include "pims-ipc-data.h"
41 #include "pims-ipc-data-internal.h"
42 #include "pims-ipc.h"
43
44 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
45         sequence_no = ++((handle)->call_sequence_no);\
46 } while (0)
47
48 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
49
50 typedef enum
51 {
52         PIMS_IPC_CALL_STATUS_READY = 0,
53         PIMS_IPC_CALL_STATUS_IN_PROGRESS
54 } pims_ipc_call_status_e;
55
56 typedef enum
57 {
58         PIMS_IPC_MODE_REQ = 0,
59         PIMS_IPC_MODE_SUB
60 } pims_ipc_mode_e;
61
62 typedef struct
63 {
64         pims_ipc_subscribe_cb callback;
65         void * user_data;
66 } pims_ipc_cb_s;
67
68 typedef struct
69 {
70         char *call_id;
71         pims_ipc_data_h *handle;
72 }pims_ipc_subscribe_data_s;
73
74 typedef struct
75 {
76         int fd;
77         char *service;
78         char *id;
79         GIOChannel *async_channel;
80         guint disconnected_source;
81         guint async_source_id;
82         pthread_mutex_t call_status_mutex;
83         pims_ipc_call_status_e call_status;
84         unsigned int call_sequence_no;
85         pims_ipc_call_async_cb call_async_callback;
86         void *call_async_userdata;
87         pims_ipc_data_h dhandle_for_async_idler;
88
89         int subscribe_fd;
90         int epoll_stop_thread;
91         pthread_t io_thread;
92         GHashTable *subscribe_cb_table;
93
94         pthread_mutex_t data_queue_mutex;
95         GList *data_queue;
96 } pims_ipc_s;
97
98 static unsigned int ref_cnt;
99 static GList *subscribe_handles;
100 static GList *disconnected_list;
101
102 typedef struct {
103         pims_ipc_server_disconnected_cb callback;
104         void *user_data;
105         pims_ipc_s *handle;
106 } pims_ipc_server_disconnected_cb_t;
107
108 /* start deprecated */
109 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
110 /* end deprecated */
111 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
112
113 static void __sub_data_free(gpointer user_data)
114 {
115         pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
116         pims_ipc_data_destroy(data->handle);
117         free(data->call_id);
118         free(data);
119 }
120
121 static void __pims_ipc_free_handle(pims_ipc_s *handle)
122 {
123         pthread_mutex_lock(&__gmutex);
124
125         handle->epoll_stop_thread = true;
126
127         if (handle->fd != -1)
128                 close(handle->fd);
129
130         pthread_mutex_unlock(&__gmutex);
131         if (handle->io_thread)
132                 pthread_join(handle->io_thread, NULL);
133         pthread_mutex_lock(&__gmutex);
134
135         g_free(handle->id);
136         g_free(handle->service);
137
138         if (handle->async_channel) {
139                 // remove a subscriber handle from the golbal list
140                 subscribe_handles = g_list_remove(subscribe_handles, handle);
141                 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
142
143                 g_source_remove(handle->async_source_id);
144                 g_io_channel_unref(handle->async_channel);
145         }
146
147         if (handle->subscribe_cb_table)
148                 g_hash_table_destroy(handle->subscribe_cb_table);
149
150         pthread_mutex_lock(&handle->data_queue_mutex);
151         if (handle->data_queue) {
152                 g_list_free_full(handle->data_queue, __sub_data_free);
153         }
154         pthread_mutex_unlock(&handle->data_queue_mutex);
155         pthread_mutex_destroy(&handle->data_queue_mutex);
156
157         if (handle->subscribe_fd != -1)
158                 close(handle->subscribe_fd);
159
160         g_source_remove(handle->disconnected_source);
161         pthread_mutex_destroy(&handle->call_status_mutex);
162
163         g_free(handle);
164
165         if (--ref_cnt <= 0) {
166                 if (subscribe_handles)
167                         g_list_free(subscribe_handles);
168                 subscribe_handles = NULL;
169         }
170
171         pthread_mutex_unlock(&__gmutex);
172 }
173
174 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
175 {
176         pims_ipc_cb_s *cb_data = NULL;
177         uint64_t dummy;
178
179         do {
180                 read_command(handle->subscribe_fd, &dummy);
181
182                 pthread_mutex_lock(&handle->data_queue_mutex);
183                 if (!handle->data_queue) {
184                         pthread_mutex_unlock(&handle->data_queue_mutex);
185                         break;
186                 }
187
188                 GList *cursor = g_list_first(handle->data_queue);
189                 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
190                 if (data == NULL) {
191                         pthread_mutex_unlock(&handle->data_queue_mutex);
192                         break;
193                 }
194
195                 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
196                 if (cb_data == NULL) {
197                         VERBOSE("unable to find %s", call_id);
198                 }
199                 else
200                         cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
201
202                 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
203                 __sub_data_free(data);
204                 pthread_mutex_unlock(&handle->data_queue_mutex);
205         } while(1);
206
207         return 0;
208 }
209
210 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
211 {
212         pims_ipc_s *handle = (pims_ipc_s *)data;
213
214         VERBOSE("");
215
216         if (condition & G_IO_HUP)
217                 return FALSE;
218
219         pthread_mutex_lock(&__gmutex);
220
221         // check if a subscriber handle is exists
222         if (g_list_find(subscribe_handles, handle) == NULL) {
223                 ERROR("No such handle that ID is %p", handle);
224                 pthread_mutex_unlock(&__gmutex);
225                 return FALSE;
226         }
227
228         __pims_ipc_receive_for_subscribe(handle);
229
230         pthread_mutex_unlock(&__gmutex);
231
232         return TRUE;
233 }
234
235 static unsigned int __get_global_sequence_no()
236 {
237         static unsigned int __gsequence_no = 0xffffffff;
238
239         if (__gsequence_no == 0xffffffff)
240                 __gsequence_no = (unsigned int)time(NULL);
241         else
242                 __gsequence_no++;
243         return __gsequence_no;
244 }
245
246 static int __pims_ipc_send_identify(pims_ipc_s *handle)
247 {
248         unsigned int sequence_no;
249         unsigned int client_id_len = strlen(handle->id);
250         unsigned int len = sizeof(unsigned int)         // total size
251                 + client_id_len + sizeof(unsigned int)          // client_id
252                 + sizeof(unsigned int)  ;       // seq_no
253
254         char buf[len+1];
255         int length = 0;
256         memset(buf, 0x0, len+1);
257
258         // total len
259         memcpy(buf, (void*)&len, sizeof(unsigned int));
260         length += sizeof(unsigned int);
261
262         // client_id
263         memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
264         length += sizeof(unsigned int);
265         memcpy(buf+length, (void*)(handle->id), client_id_len);
266         length += client_id_len;
267
268         // seq_no
269         GET_CALL_SEQUNECE_NO(handle, sequence_no);
270         memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
271         length += sizeof(unsigned int);
272
273         return socket_send(handle->fd, buf, length);
274 }
275
276 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
277 {
278         int ret;
279         gboolean is_ok = FALSE;
280         int len = 0;
281         pims_ipc_data_h data = NULL;
282         unsigned int sequence_no = 0;
283         char *client_id = NULL;
284         char *call_id = NULL;
285         char *buf = NULL;
286
287         /* read the size of message. note that ioctl is non-blocking */
288         if (ioctl(handle->fd, FIONREAD, &len)) {
289                 ERROR("ioctl failed: %d", errno);
290                 return -1;
291         }
292
293         /* when server or client closed socket */
294         if (len == 0) {
295                 ERROR("[IPC Socket] connection is closed");
296                 return -1;
297         }
298
299         do {
300                 unsigned int read_len = 0;
301                 unsigned int total_len = 0;
302                 unsigned int client_id_len = 0;
303                 unsigned int call_id_len = 0;
304                 unsigned int is_data = FALSE;
305
306                 // get total_len
307                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
308
309                 // client_id
310                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(client_id_len), sizeof(unsigned int)));
311                 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
312                         client_id = calloc(1, client_id_len+1);
313                         if (client_id == NULL) {
314                                 ERROR("calloc fail");
315                                 break;
316                         }
317                 }
318                 else
319                         break;
320                 ret = socket_recv(handle->fd, (void *)&(client_id), client_id_len);
321                 if (ret < 0) {  ERROR("socket_recv error"); break;      }
322                 read_len += ret;
323
324                 // sequence no
325                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(sequence_no), sizeof(unsigned int)));
326                 if (total_len == read_len) {
327                         // send identity
328                         data = pims_ipc_data_create(0);
329                         if (NULL == data) {
330                                 ERROR("pims_ipc_data_create() Fail");
331                                 break;
332                         }
333                         ret = pims_ipc_data_put(data, client_id, client_id_len);
334                         if (ret != 0)
335                                 WARNING("pims_ipc_data_put fail(%d)", ret);
336                         break;
337                 }
338
339                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
340                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
341                         call_id = calloc(1, call_id_len+1);
342                         if (call_id == NULL) {
343                                 ERROR("calloc fail");
344                                 break;
345                         }
346                 }
347                 else
348                         break;
349
350                 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
351                 if (ret < 0) {  ERROR("socket_recv error"); break;      }
352                 read_len += ret;
353
354                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
355                 if (is_data) {
356                         unsigned int data_len;
357                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
358                         if (data_len > 0 && data_len < UINT_MAX-1) {
359                                 buf = calloc(1, data_len+1);
360                                 if (buf == NULL) {
361                                         ERROR("calloc fail");
362                                         break;
363                                 }
364                         }
365                         else
366                                 break;
367                         ret = socket_recv(handle->fd, (void *)&(buf), data_len);
368                         if (ret < 0) {  ERROR("socket_recv error"); break;      }
369                         read_len += ret;
370
371                         data = pims_ipc_data_steal_unmarshal(buf, data_len);
372                         if (NULL == data) {
373                                 ERROR("pims_ipc_data_steal_unmarshal() Fail");
374                                 break;
375                         }
376                         buf = NULL;
377                 }
378
379                 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, sequence_no);
380         } while(0);
381         free(client_id);
382         free(call_id);
383         free(buf);
384
385         if (sequence_no == handle->call_sequence_no) {
386                 if (data_out != NULL) {
387                         *data_out = data;
388                 }
389                 else if (data)
390                         pims_ipc_data_destroy(data);
391                 is_ok = TRUE;
392         }
393         else {
394                 if (data)
395                         pims_ipc_data_destroy(data);
396                 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, sequence_no);
397         }
398
399         if (is_ok)
400                 return 0;
401
402         return -1;
403 }
404
405 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
406 {
407         int ret = -1;
408         struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof (struct pollfd));
409         if (NULL == pollfds) {
410                 ERROR("calloc() Fail");
411                 return -1;
412         }
413
414         pollfds[0].fd = handle->fd;
415         pollfds[0].events = POLLIN | POLLERR | POLLHUP;
416
417         while(1) {
418                 while(1) {
419                         ret = poll(pollfds, 1, 1000);
420                         if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)) {
421                                 continue;
422                         }
423                         break;
424                 }
425
426                 if (ret > 0) {
427                         if (pollfds[0].revents & (POLLERR|POLLHUP)) {
428                                 ERROR("Server disconnected");
429                                 ret = -1;
430                                 break;
431                         }
432                         if (pollfds[0].revents & POLLIN) {
433                                 ret = __pims_ipc_read_data(handle, data_out);
434                                 break;
435                         }
436                 }
437         }
438         free (pollfds);
439         return ret;
440 }
441
442 static int __open_subscribe_fd(pims_ipc_s *handle)
443 {
444         // router inproc eventfd
445         int subscribe_fd = eventfd(0,0);
446         int flags;
447         int ret;
448
449         if (-1 == subscribe_fd) {
450                 ERROR("eventfd error : %d", errno);
451                 return -1;
452         }
453         VERBOSE("subscribe :%d\n", subscribe_fd);
454
455         flags = fcntl (subscribe_fd, F_GETFL, 0);
456         if (flags == -1)
457                 flags = 0;
458         ret = fcntl (subscribe_fd, F_SETFL, flags | O_NONBLOCK);
459         if (0 != ret)
460                 VERBOSE("subscribe fcntl : %d\n", ret);
461
462         handle->subscribe_fd = subscribe_fd;
463         return 0;
464 }
465
466 static int __subscribe_data(pims_ipc_s * handle)
467 {
468         int len;
469         int ret = -1;
470         char *call_id = NULL;
471         char *buf = NULL;
472         pims_ipc_data_h dhandle = NULL;
473
474         do {
475                 /* read the size of message. note that ioctl is non-blocking */
476                 if (ioctl(handle->fd, FIONREAD, &len)) {
477                         ERROR("ioctl failed: %d", errno);
478                         break;
479                 }
480
481                 /* when server or client closed socket */
482                 if (len == 0) {
483                         INFO("[IPC Socket] connection is closed");
484                         ret = -1;
485                         break;
486                 }
487
488                 unsigned int read_len = 0;
489                 unsigned int total_len = 0;
490                 unsigned int call_id_len = 0;
491                 unsigned int is_data = FALSE;
492
493                 // get total_len
494                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, (void *)&total_len, sizeof(unsigned int)));
495
496                 // call_id
497                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(call_id_len), sizeof(unsigned int)));
498                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
499                         call_id = calloc(1, call_id_len+1);
500                         if (call_id == NULL) {
501                                 ERROR("calloc fail");
502                                 break;
503                         }
504                 }
505                 else
506                         break;
507
508                 ret = socket_recv(handle->fd, (void *)&(call_id), call_id_len);
509                 if (ret < 0) {  ERROR("socket_recv error"); break; }
510                 read_len += ret;
511
512                 // is_data
513                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(is_data), sizeof(unsigned int)));
514
515                 if (is_data) {
516                         unsigned int data_len;
517                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, (void *)&(data_len), sizeof(unsigned int)));
518                         if (data_len > 0 && data_len < UINT_MAX-1) {
519                                 buf = calloc(1, data_len+1);
520                                 if (buf == NULL) {
521                                         ERROR("calloc fail");
522                                         break;
523                                 }
524                         }
525                         else
526                                 break;
527                         ret = socket_recv(handle->fd, (void *)&(buf), data_len);
528                         if (ret < 0) {
529                                 ERROR("socket_recv error");
530                                 break;
531                         }
532                         read_len += ret;
533
534                         dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
535                         if (NULL == dhandle) {
536                                 ERROR("pims_ipc_data_steal_unmarshal() Fail");
537                                 break;
538                         }
539                         buf = NULL;
540
541                         pims_ipc_subscribe_data_s *sub_data = (pims_ipc_subscribe_data_s *)calloc(1, sizeof(pims_ipc_subscribe_data_s));
542                         if (NULL == sub_data) {
543                                 ERROR("calloc() Fail");
544                                 pims_ipc_data_destroy(dhandle);
545                                 ret = -1;
546                                 break;
547                         }
548                         sub_data->handle = dhandle;
549                         sub_data->call_id = call_id;
550                         call_id = NULL;
551
552                         pthread_mutex_lock(&handle->data_queue_mutex);
553                         handle->data_queue = g_list_append(handle->data_queue, sub_data);
554                         pthread_mutex_unlock(&handle->data_queue_mutex);
555                         write_command(handle->subscribe_fd, 1);
556                 }
557                 ret = 0;
558         } while(0);
559
560         free(call_id);
561         free(buf);
562         return ret;
563 }
564
565 static gboolean __hung_up_cb(gpointer data)
566 {
567         GList *cursor = NULL;
568
569         if (NULL == disconnected_list) {
570                 DEBUG("No disconnected list");
571                 return FALSE;
572         }
573
574         pthread_mutex_lock(&__disconnect_cb_mutex);
575         cursor = g_list_first(disconnected_list);
576         while (cursor) {
577                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
578                 if (disconnected && disconnected->handle == data && disconnected->callback) {
579                         DEBUG("call hung_up callback");
580                         disconnected->callback(disconnected->user_data);
581                         break;
582                 }
583                 cursor = g_list_next(cursor);
584         }
585         pthread_mutex_unlock(&__disconnect_cb_mutex);
586
587         return FALSE;
588 }
589
590 static void* __io_thread(void *data)
591 {
592         pims_ipc_s *handle = data;
593         struct epoll_event ev = {0};
594         int ret;
595         int epfd;
596
597         epfd = epoll_create(MAX_EPOLL_EVENT);
598
599         pthread_mutex_lock(&__gmutex);
600
601         ev.events = EPOLLIN | EPOLLHUP;
602         ev.data.fd = handle->fd;
603
604         ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
605         WARN_IF(ret != 0, "listen error :%d", ret);
606         pthread_mutex_unlock(&__gmutex);
607
608
609         while (1) {
610                 int i = 0;
611
612                 pthread_mutex_lock(&__gmutex);
613
614                 if (handle->epoll_stop_thread) {
615                         pthread_mutex_unlock(&__gmutex);
616                         break;
617                 }
618                 pthread_mutex_unlock(&__gmutex);
619
620                 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
621                 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, 50);
622
623                 pthread_mutex_lock(&__gmutex);
624
625                 if (handle->epoll_stop_thread) {
626                         pthread_mutex_unlock(&__gmutex);
627                         break;
628                 }
629                 pthread_mutex_unlock(&__gmutex);
630
631                 if (event_num == -1) {
632                         if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
633                                 ERROR("errno:%d\n", errno);
634                                 break;
635                         }
636                 }
637
638                 pthread_mutex_lock(&__gmutex);
639                 for (i = 0; i < event_num; i++) {
640                         if (events[i].events & EPOLLHUP) {
641                                 ERROR("server fd closed");
642                                 handle->epoll_stop_thread = true;
643                                 break;
644                         }
645
646                         if (events[i].events & EPOLLIN) {
647                                 if(__subscribe_data(handle) < 0) {
648                                         ERROR("server fd closed");
649                                         g_idle_add(__hung_up_cb, handle);
650                                         handle->epoll_stop_thread = true;
651                                         break;
652                                 }
653                         }
654                 }
655                 pthread_mutex_unlock(&__gmutex);
656         }
657
658         close(epfd);
659
660         pthread_exit(NULL);
661 }
662
663 static gboolean _g_io_hup_cb(GIOChannel *src, GIOCondition condition, gpointer data)
664 {
665         if (G_IO_HUP & condition) {
666                 DEBUG("hung up");
667                 __hung_up_cb(data);
668                 return FALSE;
669         } else if (G_IO_IN & condition) {
670                 char buf[1] = {0};
671                 if (0 == recv(((pims_ipc_s *)data)->fd, buf, sizeof(buf), MSG_PEEK)) {
672                         DEBUG("hung up");
673                         __hung_up_cb(data);
674                         return FALSE;
675                 }
676         } else {
677                 ERROR("Invalid condition (%d)", condition);
678         }
679         return TRUE;
680 }
681
682 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
683 {
684         pims_ipc_s *handle = NULL;
685         gboolean is_ok = FALSE;
686
687         pthread_mutex_lock(&__gmutex);
688
689         do {
690                 struct sockaddr_un server_addr;
691                 int ret;
692
693                 ref_cnt++;
694                 VERBOSE("Create %d th..", ref_cnt);
695
696                 handle = g_new0(pims_ipc_s, 1);
697                 if (handle == NULL) {
698                         ERROR("Failed to allocation");
699                         break;
700                 }
701
702                 handle->subscribe_fd = -1;
703                 handle->io_thread = 0;
704                 handle->service = g_strdup(service);
705                 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
706                 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
707                 if (handle->fd < 0) {
708                         ERROR("socket error : %d, errno: %d", handle->fd, errno);
709                         break;
710                 }
711                 int flags = fcntl (handle->fd, F_GETFL, 0);
712                 if (flags == -1)
713                         flags = 0;
714                 ret = fcntl (handle->fd, F_SETFL, flags | O_NONBLOCK);
715                 VERBOSE("socket fcntl : %d\n", ret);
716
717                 pthread_mutex_init(&handle->call_status_mutex, 0);
718
719                 pthread_mutex_lock(&handle->call_status_mutex);
720                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
721                 pthread_mutex_unlock(&handle->call_status_mutex);
722
723                 bzero(&server_addr, sizeof(server_addr));
724                 server_addr.sun_family = AF_UNIX;
725                 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
726
727                 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
728                 if (ret != 0) {
729                         ERROR("connect error : %d, errno: %d", ret, errno);
730                         break;
731                 }
732                 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
733
734                 if (mode == PIMS_IPC_MODE_REQ) {
735                         GIOChannel *ch = g_io_channel_unix_new(handle->fd);
736                         handle->disconnected_source = g_io_add_watch(ch, G_IO_IN|G_IO_HUP, _g_io_hup_cb, handle);
737
738                         handle->call_sequence_no = (unsigned int)time(NULL);
739                         ret = __pims_ipc_send_identify(handle);
740                         if (ret < 0) {
741                                 ERROR("__pims_ipc_send_identify error");
742                                 break;
743                         }
744                         __pims_ipc_receive(handle, NULL);
745
746                         if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0) {
747                                 WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
748                         }
749                 }
750                 else {
751                         handle->epoll_stop_thread = false;
752                         pthread_mutex_init(&handle->data_queue_mutex, 0);
753
754                         pthread_mutex_lock(&handle->data_queue_mutex);
755                         handle->data_queue = NULL;
756                         pthread_mutex_unlock(&handle->data_queue_mutex);
757
758                         ret = __open_subscribe_fd(handle);
759                         if (ret < 0)
760                                 break;
761
762                         pthread_t worker;
763                         ret = pthread_create(&worker, NULL, __io_thread, handle);
764                         if (ret != 0)
765                                 break;
766                         handle->io_thread  = worker;
767
768                         GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
769                         if (!async_channel) {
770                                 ERROR("g_io_channel_unix_new error");
771                                 break;
772                         }
773                         handle->async_channel = async_channel;
774                         handle->async_source_id = g_io_add_watch(handle->async_channel, G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
775                         handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
776                         ASSERT(handle->subscribe_cb_table);
777
778                         // add a subscriber handle to the global list
779                         subscribe_handles = g_list_append(subscribe_handles, handle);
780                         VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
781                 }
782
783                 is_ok = TRUE;
784                 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
785         } while(0);
786
787         pthread_mutex_unlock(&__gmutex);
788
789         if (FALSE == is_ok) {
790                 if (handle) {
791                         __pims_ipc_free_handle(handle);
792                         handle = NULL;
793                 }
794         }
795
796         return handle;
797 }
798
799 API pims_ipc_h pims_ipc_create(char *service)
800 {
801         return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
802 }
803
804 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
805 {
806         return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
807 }
808
809 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
810 {
811         pims_ipc_s *handle = (pims_ipc_s *)ipc;
812
813         if (mode == PIMS_IPC_MODE_REQ) {
814                 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY, NULL, NULL) != 0) {
815                         WARNING("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
816                 }
817         }
818
819         if (handle)
820                 __pims_ipc_free_handle(handle);
821 }
822
823 API void pims_ipc_destroy(pims_ipc_h ipc)
824 {
825         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
826 }
827
828 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
829 {
830         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
831 }
832
833 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
834 {
835         int ret = -1;
836         unsigned int sequence_no = 0;
837         gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
838         unsigned int call_id_len = strlen(call_id);
839         pims_ipc_data_s *data = NULL;
840         unsigned int is_data = FALSE;
841         unsigned int client_id_len = strlen(handle->id);
842         int length = 0;
843
844         GET_CALL_SEQUNECE_NO(handle, sequence_no);
845
846         int len = sizeof(unsigned int)                                          // total size
847                 + client_id_len + sizeof(unsigned int)  // client_id
848                 + sizeof(unsigned int)                                          // seq_no
849                 + call_id_len + sizeof(unsigned int)    // call_id
850                 + sizeof(unsigned int);                                         // is data
851
852         int total_len = len;
853
854         if (data_in) {
855                 is_data = TRUE;
856                 data = (pims_ipc_data_s*)data_in;
857                 len += sizeof(unsigned int);
858                 total_len = len + data->buf_size;
859         }
860
861         INFO("len : %d, client_id : %s, call_id : %s, seq_no :%d", len, handle->id, call_id, sequence_no);
862
863         char buf[len+1];
864
865         memset(buf, 0x0, len+1);
866
867         memcpy(buf, (void*)&total_len, sizeof(unsigned int));
868         length += sizeof(unsigned int);
869
870         // client_id
871         client_id_len = strlen(handle->id);
872         memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
873         length += sizeof(unsigned int);
874         memcpy(buf+length, (void*)(handle->id), client_id_len);
875         length += client_id_len;
876
877         // seq_no
878         memcpy(buf+length, (void*)&(sequence_no), sizeof(unsigned int));
879         length += sizeof(unsigned int);
880
881         // call id
882         memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
883         length += sizeof(unsigned int);
884         memcpy(buf+length, (void*)(call_id), call_id_len);
885         length += call_id_len;
886         g_free(call_id);
887
888         // is_data
889         memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
890         length += sizeof(unsigned int);
891
892         if (is_data) {
893                 memcpy(buf+length, (void*)&(data->buf_size), sizeof(unsigned int));
894                 length += sizeof(unsigned int);
895
896                 ret = socket_send(handle->fd, buf, length);
897                 if (ret > 0)
898                         ret = socket_send_data(handle->fd, data->buf, data->buf_size);
899         }
900         else {
901                 ret = socket_send(handle->fd, buf, length);
902         }
903
904         if (ret < 0)
905                 return -1;
906
907         return 0;
908 }
909
910 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
911                 pims_ipc_data_h *data_out)
912 {
913         pims_ipc_s *handle = (pims_ipc_s *)ipc;
914
915
916         if (ipc == NULL) {
917                 ERROR("invalid handle : %p", ipc);
918                 return -1;
919         }
920
921         if (!module || !function) {
922                 ERROR("invalid argument");
923                 return -1;
924         }
925
926         pthread_mutex_lock(&handle->call_status_mutex);
927         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
928                 pthread_mutex_unlock(&handle->call_status_mutex);
929                 ERROR("the previous call is in progress : %p", ipc);
930                 return -1;
931         }
932         pthread_mutex_unlock(&handle->call_status_mutex);
933
934
935         if (__pims_ipc_send(handle, module, function, data_in) != 0) {
936                 return -1;
937         }
938
939         if (__pims_ipc_receive(handle, data_out) != 0) {
940                 return -1;
941         }
942
943         return 0;
944 }
945
946 static gboolean __call_async_idler_cb(gpointer data)
947 {
948         VERBOSE("");
949
950         pims_ipc_s *handle = (pims_ipc_s *)data;
951         ASSERT(handle);
952         ASSERT(handle->dhandle_for_async_idler);
953         pims_ipc_data_h dhandle = handle->dhandle_for_async_idler;
954         handle->dhandle_for_async_idler = NULL;
955
956         pthread_mutex_lock(&handle->call_status_mutex);
957         handle->call_status = PIMS_IPC_CALL_STATUS_READY;
958         pthread_mutex_unlock(&handle->call_status_mutex);
959
960         handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
961         pims_ipc_data_destroy(dhandle);
962
963         return FALSE;
964 }
965
966 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition, gpointer data)
967 {
968         pims_ipc_s *handle = (pims_ipc_s *)data;
969         pims_ipc_data_h dhandle = NULL;
970
971         if (__pims_ipc_receive(handle, &dhandle) == 0) {
972                 VERBOSE("call status = %d", handle->call_status);
973
974                 pthread_mutex_lock(&handle->call_status_mutex);
975                 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
976                         pthread_mutex_unlock(&handle->call_status_mutex);
977                         pims_ipc_data_destroy(dhandle);
978                 }
979                 else {
980                         pthread_mutex_unlock(&handle->call_status_mutex);
981                         if (src == NULL) {    // A response is arrived too quickly
982                                 handle->dhandle_for_async_idler = dhandle;
983                                 g_idle_add(__call_async_idler_cb, handle);
984                         }
985                         else {
986                                 pthread_mutex_lock(&handle->call_status_mutex);
987                                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
988                                 pthread_mutex_unlock(&handle->call_status_mutex);
989
990                                 handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
991                                 pims_ipc_data_destroy(dhandle);
992                         }
993                 }
994         }
995         return FALSE;
996 }
997
998 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
999                 pims_ipc_call_async_cb callback, void *userdata)
1000 {
1001         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1002         guint source_id = 0;
1003
1004         if (ipc == NULL) {
1005                 ERROR("invalid handle : %p", ipc);
1006                 return -1;
1007         }
1008
1009         if (!module || !function || !callback) {
1010                 ERROR("invalid argument");
1011                 return -1;
1012         }
1013
1014         pthread_mutex_lock(&handle->call_status_mutex);
1015         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
1016                 pthread_mutex_unlock(&handle->call_status_mutex);
1017                 ERROR("the previous call is in progress : %p", ipc);
1018                 return -1;
1019         }
1020         pthread_mutex_unlock(&handle->call_status_mutex);
1021
1022         pthread_mutex_lock(&handle->call_status_mutex);
1023         handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
1024         pthread_mutex_unlock(&handle->call_status_mutex);
1025
1026         handle->call_async_callback = callback;
1027         handle->call_async_userdata = userdata;
1028
1029         // add a callback for GIOChannel
1030         if (!handle->async_channel) {
1031                 handle->async_channel = g_io_channel_unix_new(handle->fd);
1032                 if (!handle->async_channel) {
1033                         ERROR("g_io_channel_unix_new error");
1034                         return -1;
1035                 }
1036         }
1037
1038         source_id = g_io_add_watch(handle->async_channel, G_IO_IN, __pims_ipc_call_async_handler, handle);
1039         handle->async_source_id = source_id;
1040
1041         if (__pims_ipc_send(handle, module, function, data_in) != 0) {
1042                 g_source_remove(source_id);
1043                 return -1;
1044         }
1045
1046         __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
1047
1048         return 0;
1049 }
1050
1051 API bool pims_ipc_is_call_in_progress(pims_ipc_h ipc)
1052 {
1053         int ret;
1054         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1055
1056         if (ipc == NULL) {
1057                 ERROR("invalid handle : %p", ipc);
1058                 return false;
1059         }
1060
1061         pthread_mutex_lock(&handle->call_status_mutex);
1062         if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
1063                 ret = true;
1064         else
1065                 ret = false;
1066         pthread_mutex_unlock(&handle->call_status_mutex);
1067         return ret;
1068 }
1069
1070 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event, pims_ipc_subscribe_cb callback, void *userdata)
1071 {
1072         gchar *call_id = NULL;
1073         pims_ipc_cb_s *cb_data = NULL;
1074         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1075
1076         if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1077                 ERROR("invalid handle : %p", ipc);
1078                 return -1;
1079         }
1080
1081         if (!module || !event || !callback) {
1082                 ERROR("invalid argument");
1083                 return -1;
1084         }
1085
1086         cb_data = g_new0(pims_ipc_cb_s, 1);
1087         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1088
1089         VERBOSE("subscribe cb id[%s]", call_id);
1090         cb_data->callback = callback;
1091         cb_data->user_data = userdata;
1092         g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1093
1094         return 0;
1095 }
1096
1097 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1098 {
1099         gchar *call_id = NULL;
1100         pims_ipc_s *handle = (pims_ipc_s *)ipc;
1101
1102         if (ipc == NULL || handle->subscribe_cb_table == NULL) {
1103                 ERROR("invalid handle : %p", ipc);
1104                 return -1;
1105         }
1106
1107         if (!module || !event) {
1108                 ERROR("invalid argument");
1109                 return -1;
1110         }
1111
1112         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1113
1114         VERBOSE("unsubscribe cb id[%s]", call_id);
1115
1116         if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1117                 ERROR("g_hash_table_remove error");
1118                 g_free(call_id);
1119                 return -1;
1120         }
1121
1122         g_free(call_id);
1123         return 0;
1124 }
1125
1126 API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle, pims_ipc_server_disconnected_cb callback, void *user_data)
1127 {
1128         GList *cursor = NULL;
1129
1130         /* check already existed */
1131         pthread_mutex_lock(&__disconnect_cb_mutex);
1132         cursor = g_list_first(disconnected_list);
1133         while (cursor) {
1134                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1135                 if (disconnected && disconnected->handle == handle) {
1136                         ERROR("Already set callback");
1137                         pthread_mutex_unlock(&__disconnect_cb_mutex);
1138                         return -1;
1139                 }
1140                 cursor = g_list_next(cursor);
1141         }
1142         pthread_mutex_unlock(&__disconnect_cb_mutex);
1143
1144         /* append callback */
1145         pims_ipc_server_disconnected_cb_t *disconnected = NULL;
1146         disconnected = calloc(1, sizeof(pims_ipc_server_disconnected_cb_t));
1147         if (NULL == disconnected) {
1148                 ERROR("Calloc() Fail");
1149                 return -1;
1150         }
1151         DEBUG("add disconnected");
1152         disconnected->handle = handle;
1153         disconnected->callback = callback;
1154         disconnected->user_data = user_data;
1155
1156         pthread_mutex_lock(&__disconnect_cb_mutex);
1157         disconnected_list = g_list_append(disconnected_list, disconnected);
1158         pthread_mutex_unlock(&__disconnect_cb_mutex);
1159
1160         return 0;
1161 }
1162
1163 API int pims_ipc_remove_server_disconnected_cb(pims_ipc_h handle)
1164 {
1165         pthread_mutex_lock(&__disconnect_cb_mutex);
1166
1167         GList *cursor = NULL;
1168         cursor = g_list_first(disconnected_list);
1169         while (cursor) {
1170                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1171                 if (disconnected && disconnected->handle == handle) {
1172                         free(disconnected);
1173                         disconnected_list = g_list_delete_link(disconnected_list, cursor);
1174                         DEBUG("remove disconnected_cb");
1175                         break;
1176                 }
1177                 cursor = g_list_next(cursor);
1178         }
1179         pthread_mutex_unlock(&__disconnect_cb_mutex);
1180
1181         return 0;
1182 }
1183
1184 /* start deprecated */
1185 API int pims_ipc_unset_server_disconnected_cb()
1186 {
1187         pthread_mutex_lock(&__disconnect_cb_mutex);
1188         _server_disconnected_cb.callback = NULL;
1189         _server_disconnected_cb.user_data = NULL;
1190         pthread_mutex_unlock(&__disconnect_cb_mutex);
1191         return 0;
1192 }
1193
1194 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback, void *user_data)
1195 {
1196         pthread_mutex_lock(&__disconnect_cb_mutex);
1197         _server_disconnected_cb.callback = callback;
1198         _server_disconnected_cb.user_data = user_data;
1199         pthread_mutex_unlock(&__disconnect_cb_mutex);
1200         return 0;
1201 }
1202 /* end deprecated */