clean up: devied files, remove router
[platform/core/pim/pims-ipc.git] / src / pims-ipc.c
1 /*
2  * PIMS IPC
3  *
4  * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the License);
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an AS IS BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include <unistd.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <time.h>
24 #include <glib.h>
25 #include <stdint.h>
26 #include <pthread.h>
27 #include <poll.h>       /* pollfds */
28 #include <sys/un.h>     /* sockaddr_un */
29 #include <sys/ioctl.h>  /* ioctl */
30 #include <sys/socket.h> /* socket */
31 #include <sys/types.h>
32 #include <sys/epoll.h>  /* epoll */
33 #include <sys/eventfd.h> /* eventfd */
34 #include <fcntl.h>
35 #include <errno.h>
36
37 #include "pims-internal.h"
38 #include "pims-socket.h"
39 #include "pims-ipc-data.h"
40 #include "pims-ipc-data-internal.h"
41 #include "pims-ipc.h"
42
43 #define GET_CALL_SEQUNECE_NO(handle, sequence_no) do {\
44         sequence_no = ++((handle)->call_sequence_no);\
45 } while (0)
46
47 static pthread_mutex_t __gmutex = PTHREAD_MUTEX_INITIALIZER;
48
49 typedef enum {
50         PIMS_IPC_CALL_STATUS_READY = 0,
51         PIMS_IPC_CALL_STATUS_IN_PROGRESS
52 } pims_ipc_call_status_e;
53
54 typedef enum {
55         PIMS_IPC_MODE_REQ = 0,
56         PIMS_IPC_MODE_SUB
57 } pims_ipc_mode_e;
58
59 typedef struct {
60         pims_ipc_subscribe_cb callback;
61         void * user_data;
62 } pims_ipc_cb_s;
63
64 typedef struct {
65         char *call_id;
66         pims_ipc_data_h *handle;
67 } pims_ipc_subscribe_data_s;
68
69 typedef struct {
70         int fd;
71         char *service;
72         char *id;
73         GIOChannel *async_channel;
74         guint disconnected_source;
75         guint async_source_id;
76         pthread_mutex_t call_status_mutex;
77         pims_ipc_call_status_e call_status;
78         unsigned int call_sequence_no;
79         pims_ipc_call_async_cb call_async_callback;
80         void *call_async_userdata;
81         pims_ipc_data_h dhandle_for_async_idler;
82
83         int subscribe_fd;
84         int epoll_stop_thread;
85         pthread_t io_thread;
86         GHashTable *subscribe_cb_table;
87
88         pthread_mutex_t data_queue_mutex;
89         GList *data_queue;
90 } pims_ipc_s;
91
92 static unsigned int ref_cnt;
93 static GList *subscribe_handles;
94 static GList *disconnected_list;
95
96 typedef struct {
97         pims_ipc_server_disconnected_cb callback;
98         void *user_data;
99         pims_ipc_s *handle;
100 } pims_ipc_server_disconnected_cb_t;
101
102 /* start deprecated */
103 static pims_ipc_server_disconnected_cb_t _server_disconnected_cb = {NULL, NULL};
104 /* end deprecated */
105 static pthread_mutex_t __disconnect_cb_mutex = PTHREAD_MUTEX_INITIALIZER;
106
107 static void __sub_data_free(gpointer user_data)
108 {
109         pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s*)user_data;
110         pims_ipc_data_destroy(data->handle);
111         free(data->call_id);
112         free(data);
113 }
114
115 static void __pims_ipc_free_handle(pims_ipc_s *handle)
116 {
117         pthread_mutex_lock(&__gmutex);
118
119         handle->epoll_stop_thread = TRUE;
120
121         if (handle->fd != -1)
122                 close(handle->fd);
123
124         pthread_mutex_unlock(&__gmutex);
125         if (handle->io_thread)
126                 pthread_join(handle->io_thread, NULL);
127         pthread_mutex_lock(&__gmutex);
128
129         g_free(handle->id);
130         g_free(handle->service);
131
132         if (handle->async_channel) {
133                 /* remove a subscriber handle from the golbal list */
134                 subscribe_handles = g_list_remove(subscribe_handles, handle);
135                 VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
136
137                 g_source_remove(handle->async_source_id);
138                 g_io_channel_unref(handle->async_channel);
139         }
140
141         if (handle->subscribe_cb_table)
142                 g_hash_table_destroy(handle->subscribe_cb_table);
143
144         pthread_mutex_lock(&handle->data_queue_mutex);
145         if (handle->data_queue)
146                 g_list_free_full(handle->data_queue, __sub_data_free);
147
148         pthread_mutex_unlock(&handle->data_queue_mutex);
149         pthread_mutex_destroy(&handle->data_queue_mutex);
150
151         if (handle->subscribe_fd != -1)
152                 close(handle->subscribe_fd);
153
154         if (0 < handle->disconnected_source)
155                 g_source_remove(handle->disconnected_source);
156
157         pthread_mutex_destroy(&handle->call_status_mutex);
158
159         g_free(handle);
160
161         if (--ref_cnt <= 0) {
162                 if (subscribe_handles)
163                         g_list_free(subscribe_handles);
164                 subscribe_handles = NULL;
165         }
166
167         pthread_mutex_unlock(&__gmutex);
168 }
169
170 static int __pims_ipc_receive_for_subscribe(pims_ipc_s *handle)
171 {
172         pims_ipc_cb_s *cb_data = NULL;
173         uint64_t dummy;
174
175         do {
176                 read_command(handle->subscribe_fd, &dummy);
177
178                 pthread_mutex_lock(&handle->data_queue_mutex);
179                 if (!handle->data_queue) {
180                         pthread_mutex_unlock(&handle->data_queue_mutex);
181                         break;
182                 }
183
184                 GList *cursor = g_list_first(handle->data_queue);
185                 pims_ipc_subscribe_data_s *data = (pims_ipc_subscribe_data_s *)cursor->data;
186                 if (data == NULL) {
187                         pthread_mutex_unlock(&handle->data_queue_mutex);
188                         break;
189                 }
190
191                 cb_data = (pims_ipc_cb_s*)g_hash_table_lookup(handle->subscribe_cb_table, data->call_id);
192                 if (cb_data == NULL)
193                         VERBOSE("unable to find %s", call_id);
194                 else
195                         cb_data->callback((pims_ipc_h)handle, data->handle, cb_data->user_data);
196
197                 handle->data_queue = g_list_delete_link(handle->data_queue, cursor);
198                 __sub_data_free(data);
199                 pthread_mutex_unlock(&handle->data_queue_mutex);
200         } while (1);
201
202         return 0;
203 }
204
205 static gboolean __pims_ipc_subscribe_handler(GIOChannel *src, GIOCondition condition, gpointer data)
206 {
207         pims_ipc_s *handle = (pims_ipc_s *)data;
208
209         if (condition & G_IO_HUP)
210                 return FALSE;
211
212         pthread_mutex_lock(&__gmutex);
213
214         /* check if a subscriber handle is exists */
215         if (g_list_find(subscribe_handles, handle) == NULL) {
216                 ERR("No such handle that ID is %p", handle);
217                 pthread_mutex_unlock(&__gmutex);
218                 return FALSE;
219         }
220
221         __pims_ipc_receive_for_subscribe(handle);
222
223         pthread_mutex_unlock(&__gmutex);
224
225         return TRUE;
226 }
227
228 static unsigned int __get_global_sequence_no()
229 {
230         static unsigned int __gsequence_no = 0xffffffff;
231
232         if (__gsequence_no == 0xffffffff)
233                 __gsequence_no = (unsigned int)time(NULL);
234         else
235                 __gsequence_no++;
236         return __gsequence_no;
237 }
238
239 static int __pims_ipc_send_identify(pims_ipc_s *handle)
240 {
241         unsigned int total_len, seq_no;
242         unsigned int client_id_len = strlen(handle->id);
243
244         total_len = sizeof(total_len) + sizeof(client_id_len)+client_id_len + sizeof(seq_no);
245
246         int length = 0;
247         char buf[total_len+1];
248         memset(buf, 0x0, total_len+1);
249
250         memcpy(buf, &total_len, sizeof(total_len));
251         length += sizeof(total_len);
252
253         memcpy(buf+length, &(client_id_len), sizeof(client_id_len));
254         length += sizeof(client_id_len);
255         memcpy(buf+length, handle->id, client_id_len);
256         length += client_id_len;
257
258         GET_CALL_SEQUNECE_NO(handle, seq_no);
259         memcpy(buf+length, &(seq_no), sizeof(seq_no));
260         length += sizeof(seq_no);
261
262         return socket_send(handle->fd, buf, length);
263 }
264
265 static int __pims_ipc_read_data(pims_ipc_s *handle, pims_ipc_data_h *data_out)
266 {
267         int ret;
268         gboolean is_ok = FALSE;
269         int len = 0;
270         pims_ipc_data_h data = NULL;
271         unsigned int seq_no = 0;
272         char *client_id = NULL;
273         char *call_id = NULL;
274         char *buf = NULL;
275
276         /* read the size of message. note that ioctl is non-blocking */
277         if (ioctl(handle->fd, FIONREAD, &len)) {
278                 ERR("ioctl failed: %d", errno);
279                 return -1;
280         }
281
282         /* when server or client closed socket */
283         if (len == 0) {
284                 ERR("[IPC Socket] connection is closed");
285                 return -1;
286         }
287
288         do {
289                 unsigned int read_len = 0;
290                 unsigned int total_len = 0;
291                 unsigned int client_id_len = 0;
292                 unsigned int call_id_len = 0;
293                 unsigned int has_data = FALSE;
294
295                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len)));
296
297                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &(client_id_len), sizeof(client_id_len)));
298                 if (client_id_len > 0 && client_id_len < UINT_MAX-1) {
299                         client_id = calloc(1, client_id_len+1);
300                         if (client_id == NULL) {
301                                 ERR("calloc fail");
302                                 break;
303                         }
304                 } else
305                         break;
306                 ret = socket_recv(handle->fd, (void *)&client_id, client_id_len);
307                 if (ret < 0) {  ERR("socket_recv error"); break;        }
308                 read_len += ret;
309
310                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &seq_no, sizeof(seq_no)));
311                 if (total_len == read_len) {
312                         /* send identity */
313                         data = pims_ipc_data_create(0);
314                         if (NULL == data) {
315                                 ERR("pims_ipc_data_create() Fail");
316                                 break;
317                         }
318                         ret = pims_ipc_data_put(data, client_id, client_id_len);
319                         if (ret != 0)
320                                 WARN("pims_ipc_data_put fail(%d)", ret);
321                         break;
322                 }
323
324                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len)));
325                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
326                         call_id = calloc(1, call_id_len+1);
327                         if (call_id == NULL) {
328                                 ERR("calloc fail");
329                                 break;
330                         }
331                 } else
332                         break;
333
334                 ret = socket_recv(handle->fd, (void *)&call_id, call_id_len);
335                 if (ret < 0) {  ERR("socket_recv error"); break;        }
336                 read_len += ret;
337
338                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data)));
339                 if (has_data) {
340                         unsigned int data_len;
341                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len)));
342                         if (data_len > 0 && data_len < UINT_MAX-1) {
343                                 buf = calloc(1, data_len+1);
344                                 if (buf == NULL) {
345                                         ERR("calloc fail");
346                                         break;
347                                 }
348                         } else
349                                 break;
350                         ret = socket_recv(handle->fd, (void *)&buf, data_len);
351                         if (ret < 0) {  ERR("socket_recv error"); break;        }
352                         read_len += ret;
353
354                         data = pims_ipc_data_steal_unmarshal(buf, data_len);
355                         if (NULL == data) {
356                                 ERR("pims_ipc_data_steal_unmarshal() Fail");
357                                 break;
358                         }
359                 }
360
361                 INFO("client_id :%s, call_id : %s, seq_no : %d", client_id, call_id, seq_no);
362         } while (0);
363         free(client_id);
364         free(call_id);
365         free(buf);
366
367         if (seq_no == handle->call_sequence_no) {
368                 if (data_out != NULL)
369                         *data_out = data;
370                 else if (data)
371                         pims_ipc_data_destroy(data);
372                 is_ok = TRUE;
373         } else {
374                 if (data)
375                         pims_ipc_data_destroy(data);
376                 VERBOSE("received an mismatched response (%x:%x)", handle->call_sequence_no, seq_no);
377         }
378
379         if (is_ok)
380                 return 0;
381
382         return -1;
383 }
384
385 static int __pims_ipc_receive(pims_ipc_s *handle, pims_ipc_data_h *data_out)
386 {
387         int ret = -1;
388         struct pollfd pollfds[1];
389
390         pollfds[0].fd = handle->fd;
391         pollfds[0].events = POLLIN | POLLERR | POLLHUP;
392
393         while (1) {
394                 while (1) {
395                         ret = poll(pollfds, 1, 1000);
396                         if (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
397                                 continue;
398
399                         break;
400                 }
401
402                 if (ret > 0) {
403                         if (pollfds[0].revents & (POLLERR|POLLHUP)) {
404                                 ERR("Server disconnected");
405                                 ret = -1;
406                                 break;
407                         }
408                         if (pollfds[0].revents & POLLIN) {
409                                 ret = __pims_ipc_read_data(handle, data_out);
410                                 break;
411                         }
412                 }
413         }
414
415         return ret;
416 }
417
418 static int __open_subscribe_fd(pims_ipc_s *handle)
419 {
420         int ret;
421         int flags;
422
423         int subscribe_fd = eventfd(0, 0);
424         if (-1 == subscribe_fd) {
425                 ERR("eventfd error : %d", errno);
426                 return -1;
427         }
428         VERBOSE("subscribe :%d\n", subscribe_fd);
429
430         flags = fcntl(subscribe_fd, F_GETFL, 0);
431         if (flags == -1)
432                 flags = 0;
433
434         ret = fcntl(subscribe_fd, F_SETFL, flags | O_NONBLOCK);
435         if (0 != ret)
436                 ERR("fcntl() Fail(%d)", errno);
437
438         handle->subscribe_fd = subscribe_fd;
439         return 0;
440 }
441
442 static int __subscribe_data(pims_ipc_s * handle)
443 {
444         int len;
445         int ret = -1;
446         char *call_id = NULL;
447         char *buf = NULL;
448         pims_ipc_data_h dhandle = NULL;
449
450         do {
451                 /* read the size of message. note that ioctl is non-blocking */
452                 if (ioctl(handle->fd, FIONREAD, &len)) {
453                         ERR("ioctl failed: %d", errno);
454                         break;
455                 }
456
457                 /* when server or client closed socket */
458                 if (len == 0) {
459                         INFO("[IPC Socket] connection is closed");
460                         ret = -1;
461                         break;
462                 }
463
464                 unsigned int read_len = 0;
465                 unsigned int total_len = 0;
466                 unsigned int call_id_len = 0;
467                 unsigned int has_data = FALSE;
468
469                 read_len = TEMP_FAILURE_RETRY(read(handle->fd, &total_len, sizeof(total_len)));
470
471                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &call_id_len, sizeof(call_id_len)));
472                 if (call_id_len > 0 && call_id_len < UINT_MAX-1) {
473                         call_id = calloc(1, call_id_len+1);
474                         if (call_id == NULL) {
475                                 ERR("calloc fail");
476                                 break;
477                         }
478                 } else
479                         break;
480
481                 ret = socket_recv(handle->fd, (void *)&call_id, call_id_len);
482                 if (ret < 0) {  ERR("socket_recv error"); break; }
483                 read_len += ret;
484
485                 read_len += TEMP_FAILURE_RETRY(read(handle->fd, &has_data, sizeof(has_data)));
486
487                 if (has_data) {
488                         unsigned int data_len;
489                         read_len += TEMP_FAILURE_RETRY(read(handle->fd, &data_len, sizeof(data_len)));
490                         if (data_len > 0 && data_len < UINT_MAX-1) {
491                                 buf = calloc(1, data_len+1);
492                                 if (buf == NULL) {
493                                         ERR("calloc fail");
494                                         break;
495                                 }
496                         } else
497                                 break;
498                         ret = socket_recv(handle->fd, (void *)&buf, data_len);
499                         if (ret < 0) {
500                                 ERR("socket_recv error(%d)", ret);
501                                 break;
502                         }
503                         read_len += ret;
504
505                         dhandle = pims_ipc_data_steal_unmarshal(buf, data_len);
506                         if (NULL == dhandle) {
507                                 ERR("pims_ipc_data_steal_unmarshal() Fail");
508                                 break;
509                         }
510
511                         pims_ipc_subscribe_data_s *sub_data;
512                         sub_data = calloc(1, sizeof(pims_ipc_subscribe_data_s));
513                         if (NULL == sub_data) {
514                                 ERR("calloc() Fail");
515                                 pims_ipc_data_destroy(dhandle);
516                                 ret = -1;
517                                 break;
518                         }
519                         sub_data->handle = dhandle;
520                         sub_data->call_id = call_id;
521                         call_id = NULL;
522
523                         pthread_mutex_lock(&handle->data_queue_mutex);
524                         handle->data_queue = g_list_append(handle->data_queue, sub_data);
525                         pthread_mutex_unlock(&handle->data_queue_mutex);
526                         write_command(handle->subscribe_fd, 1);
527                 }
528                 ret = 0;
529         } while (0);
530
531         free(call_id);
532         free(buf);
533         return ret;
534 }
535
536 static gboolean __hung_up_cb(gpointer data)
537 {
538         GList *cursor = NULL;
539
540         if (NULL == disconnected_list) {
541                 DBG("No disconnected list");
542                 return FALSE;
543         }
544
545         pthread_mutex_lock(&__disconnect_cb_mutex);
546         cursor = g_list_first(disconnected_list);
547         while (cursor) {
548                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
549                 if (disconnected && disconnected->handle == data && disconnected->callback) {
550                         DBG("call hung_up callback");
551                         disconnected->callback(disconnected->user_data);
552                         break;
553                 }
554                 cursor = g_list_next(cursor);
555         }
556         pthread_mutex_unlock(&__disconnect_cb_mutex);
557
558         return FALSE;
559 }
560
561 static void* __io_thread(void *data)
562 {
563         pims_ipc_s *handle = data;
564         struct epoll_event ev = {0};
565         int ret;
566         int epfd;
567
568         epfd = epoll_create(MAX_EPOLL_EVENT);
569
570         pthread_mutex_lock(&__gmutex);
571
572         ev.events = EPOLLIN | EPOLLHUP;
573         ev.data.fd = handle->fd;
574
575         ret = epoll_ctl(epfd, EPOLL_CTL_ADD, handle->fd, &ev);
576         WARN_IF(ret != 0, "listen error :%d", ret);
577         pthread_mutex_unlock(&__gmutex);
578
579
580         while (1) {
581                 int i = 0;
582
583                 pthread_mutex_lock(&__gmutex);
584                 if (handle->epoll_stop_thread) {
585                         pthread_mutex_unlock(&__gmutex);
586                         break;
587                 }
588                 pthread_mutex_unlock(&__gmutex);
589
590                 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
591                 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, 50);
592
593                 pthread_mutex_lock(&__gmutex);
594
595                 if (handle->epoll_stop_thread) {
596                         pthread_mutex_unlock(&__gmutex);
597                         break;
598                 }
599                 pthread_mutex_unlock(&__gmutex);
600
601                 if (event_num == -1) {
602                         if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
603                                 ERR("errno:%d\n", errno);
604                                 break;
605                         }
606                 }
607
608                 pthread_mutex_lock(&__gmutex);
609                 for (i = 0; i < event_num; i++) {
610                         if (events[i].events & EPOLLHUP) {
611                                 ERR("server fd closed");
612                                 handle->epoll_stop_thread = TRUE;
613                                 break;
614                         }
615
616                         if (events[i].events & EPOLLIN) {
617                                 if (__subscribe_data(handle) < 0) {
618                                         ERR("server fd closed");
619                                         g_idle_add(__hung_up_cb, handle);
620                                         handle->epoll_stop_thread = TRUE;
621                                         break;
622                                 }
623                         }
624                 }
625                 pthread_mutex_unlock(&__gmutex);
626         }
627
628         close(epfd);
629
630         pthread_exit(NULL);
631 }
632
633 static gboolean _g_io_hup_cb(GIOChannel *src, GIOCondition condition, gpointer data)
634 {
635         if (G_IO_HUP & condition) {
636                 DBG("hung up");
637                 __hung_up_cb(data);
638                 return FALSE;
639
640         } else if (G_IO_IN & condition) {
641                 char buf[1] = {0};
642                 if (0 == recv(((pims_ipc_s *)data)->fd, buf, sizeof(buf), MSG_PEEK)) {
643                         DBG("hung up");
644                         __hung_up_cb(data);
645                         return FALSE;
646                 }
647         } else {
648                 ERR("Invalid condition (%d)", condition);
649         }
650         return TRUE;
651 }
652
653 static pims_ipc_h __pims_ipc_create(char *service, pims_ipc_mode_e mode)
654 {
655         pims_ipc_s *handle = NULL;
656         gboolean is_ok = FALSE;
657
658         pthread_mutex_lock(&__gmutex);
659
660         do {
661                 struct sockaddr_un server_addr;
662                 int ret;
663
664                 ref_cnt++;
665                 VERBOSE("Create %d th..", ref_cnt);
666
667                 handle = g_new0(pims_ipc_s, 1);
668                 if (handle == NULL) {
669                         ERR("Failed to allocation");
670                         break;
671                 }
672
673                 handle->subscribe_fd = -1;
674                 handle->io_thread = 0;
675                 handle->service = g_strdup(service);
676                 handle->id = g_strdup_printf("%x:%x", getpid(), __get_global_sequence_no());
677                 handle->fd = socket(PF_UNIX, SOCK_STREAM, 0);
678                 if (handle->fd < 0) {
679                         ERR("socket error : %d, errno: %d", handle->fd, errno);
680                         break;
681                 }
682                 int flags = fcntl(handle->fd, F_GETFL, 0);
683                 if (flags == -1)
684                         flags = 0;
685                 ret = fcntl(handle->fd, F_SETFL, flags | O_NONBLOCK);
686                 if (0 != ret)
687                         ERR("fcntl() Fail(%d)", errno);
688
689                 pthread_mutex_init(&handle->call_status_mutex, 0);
690
691                 pthread_mutex_lock(&handle->call_status_mutex);
692                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
693                 pthread_mutex_unlock(&handle->call_status_mutex);
694
695                 bzero(&server_addr, sizeof(server_addr));
696                 server_addr.sun_family = AF_UNIX;
697                 snprintf(server_addr.sun_path, sizeof(server_addr.sun_path), "%s", handle->service);
698
699                 ret = connect(handle->fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
700                 if (ret != 0) {
701                         ERR("connect error : %d, errno: %d", ret, errno);
702                         break;
703                 }
704                 VERBOSE("connect to server : socket:%s, client_sock:%d, %d\n", handle->service, handle->fd, ret);
705
706                 if (mode == PIMS_IPC_MODE_REQ) {
707                         GIOChannel *ch = g_io_channel_unix_new(handle->fd);
708                         handle->disconnected_source = g_io_add_watch(ch, G_IO_IN|G_IO_HUP,
709                                         _g_io_hup_cb, handle);
710                         g_io_channel_unref(ch);
711
712                         handle->call_sequence_no = (unsigned int)time(NULL);
713                         ret = __pims_ipc_send_identify(handle);
714                         if (ret < 0) {
715                                 ERR("__pims_ipc_send_identify error");
716                                 break;
717                         }
718                         __pims_ipc_receive(handle, NULL);
719
720                         if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_CREATE, NULL, NULL) != 0)
721                                 WARN("pims_ipc_call(PIMS_IPC_FUNCTION_CREATE) failed");
722
723                 } else {
724                         handle->epoll_stop_thread = FALSE;
725                         pthread_mutex_init(&handle->data_queue_mutex, 0);
726
727                         pthread_mutex_lock(&handle->data_queue_mutex);
728                         handle->data_queue = NULL;
729                         pthread_mutex_unlock(&handle->data_queue_mutex);
730
731                         ret = __open_subscribe_fd(handle);
732                         if (ret < 0)
733                                 break;
734
735                         pthread_t worker;
736                         ret = pthread_create(&worker, NULL, __io_thread, handle);
737                         if (ret != 0)
738                                 break;
739                         handle->io_thread  = worker;
740
741                         GIOChannel *async_channel = g_io_channel_unix_new(handle->subscribe_fd);
742                         if (!async_channel) {
743                                 ERR("g_io_channel_unix_new error");
744                                 break;
745                         }
746                         handle->async_channel = async_channel;
747                         handle->async_source_id = g_io_add_watch(handle->async_channel,
748                                         G_IO_IN|G_IO_HUP, __pims_ipc_subscribe_handler, handle);
749                         handle->subscribe_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal,
750                                         g_free, g_free);
751                         ASSERT(handle->subscribe_cb_table);
752
753                         /* add a subscriber handle to the global list */
754                         subscribe_handles = g_list_append(subscribe_handles, handle);
755                         VERBOSE("the count of subscribe handles = %d", g_list_length(subscribe_handles));
756                 }
757
758                 is_ok = TRUE;
759                 VERBOSE("A new handle is created : %s, %s", handle->service, handle->id);
760         } while (0);
761
762         pthread_mutex_unlock(&__gmutex);
763
764         if (FALSE == is_ok) {
765                 if (handle) {
766                         __pims_ipc_free_handle(handle);
767                         handle = NULL;
768                 }
769         }
770
771         return handle;
772 }
773
774 API pims_ipc_h pims_ipc_create(char *service)
775 {
776         return __pims_ipc_create(service, PIMS_IPC_MODE_REQ);
777 }
778
779 API pims_ipc_h pims_ipc_create_for_subscribe(char *service)
780 {
781         return __pims_ipc_create(service, PIMS_IPC_MODE_SUB);
782 }
783
784 static void __pims_ipc_destroy(pims_ipc_h ipc, pims_ipc_mode_e mode)
785 {
786         pims_ipc_s *handle = ipc;
787
788         if (mode == PIMS_IPC_MODE_REQ) {
789                 if (pims_ipc_call(handle, PIMS_IPC_MODULE_INTERNAL, PIMS_IPC_FUNCTION_DESTROY,
790                                 NULL, NULL) != 0) {
791                         WARN("pims_ipc_call(PIMS_IPC_FUNCTION_DESTROY) failed");
792                 }
793         }
794
795         if (handle)
796                 __pims_ipc_free_handle(handle);
797 }
798
799 API void pims_ipc_destroy(pims_ipc_h ipc)
800 {
801         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_REQ);
802 }
803
804 API void pims_ipc_destroy_for_subscribe(pims_ipc_h ipc)
805 {
806         __pims_ipc_destroy(ipc, PIMS_IPC_MODE_SUB);
807 }
808
809 static int __pims_ipc_send(pims_ipc_s *handle, char *module, char *function, pims_ipc_data_h data_in)
810 {
811         int ret = -1;
812         int length = 0;
813         unsigned int total_len;
814         unsigned int seq_no = 0;
815         gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
816         unsigned int call_id_len = strlen(call_id);
817         pims_ipc_data_s *data = NULL;
818         unsigned int has_data = FALSE;
819         unsigned int client_id_len = strlen(handle->id);
820
821         GET_CALL_SEQUNECE_NO(handle, seq_no);
822
823         int len = sizeof(total_len)     + sizeof(client_id_len) + client_id_len + sizeof(seq_no)
824                 + call_id_len + sizeof(call_id_len) + sizeof(has_data);
825         total_len = len;
826
827         if (data_in) {
828                 has_data = TRUE;
829                 data = data_in;
830                 len += sizeof(unsigned int);
831                 total_len = len + data->buf_size;
832         }
833
834         INFO("len(%d),client_id(%s),call_id(%s),seq_no(%d)", len, handle->id, call_id, seq_no);
835
836         char buf[len+1];
837         memset(buf, 0x0, len+1);
838
839         memcpy(buf, &total_len, sizeof(total_len));
840         length += sizeof(total_len);
841
842         client_id_len = strlen(handle->id);
843         memcpy(buf+length, &client_id_len, sizeof(client_id_len));
844         length += sizeof(client_id_len);
845         memcpy(buf+length, handle->id, client_id_len);
846         length += client_id_len;
847
848         memcpy(buf+length, &seq_no, sizeof(seq_no));
849         length += sizeof(seq_no);
850
851         memcpy(buf+length, &call_id_len, sizeof(call_id_len));
852         length += sizeof(call_id_len);
853         memcpy(buf+length, call_id, call_id_len);
854         length += call_id_len;
855         g_free(call_id);
856
857         memcpy(buf+length, &has_data, sizeof(has_data));
858         length += sizeof(has_data);
859
860         if (has_data) {
861                 memcpy(buf+length, &(data->buf_size), sizeof(data->buf_size));
862                 length += sizeof(data->buf_size);
863
864                 ret = socket_send(handle->fd, buf, length);
865                 if (ret > 0)
866                         ret = socket_send_data(handle->fd, data->buf, data->buf_size);
867         } else {
868                 ret = socket_send(handle->fd, buf, length);
869         }
870
871         if (ret < 0)
872                 return -1;
873
874         return 0;
875 }
876
877 API int pims_ipc_call(pims_ipc_h ipc, char *module, char *function, pims_ipc_data_h data_in,
878                 pims_ipc_data_h *data_out)
879 {
880         pims_ipc_s *handle = ipc;
881
882         RETV_IF(NULL == ipc, -1);
883         RETV_IF(NULL == module, -1);
884         RETV_IF(NULL == function, -1);
885
886         pthread_mutex_lock(&handle->call_status_mutex);
887         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
888                 pthread_mutex_unlock(&handle->call_status_mutex);
889                 ERR("the previous call is in progress : %p", ipc);
890                 return -1;
891         }
892         pthread_mutex_unlock(&handle->call_status_mutex);
893
894         if (__pims_ipc_send(handle, module, function, data_in) != 0)
895                 return -1;
896
897         if (__pims_ipc_receive(handle, data_out) != 0)
898                 return -1;
899
900         return 0;
901 }
902
903 static gboolean __call_async_idler_cb(gpointer data)
904 {
905         pims_ipc_s *handle = data;
906         pims_ipc_data_h dhandle;
907
908         RETV_IF(NULL == handle, FALSE);
909
910         dhandle = handle->dhandle_for_async_idler;
911         handle->dhandle_for_async_idler = NULL;
912
913         pthread_mutex_lock(&handle->call_status_mutex);
914         handle->call_status = PIMS_IPC_CALL_STATUS_READY;
915         pthread_mutex_unlock(&handle->call_status_mutex);
916
917         handle->call_async_callback((pims_ipc_h)handle, dhandle, handle->call_async_userdata);
918         pims_ipc_data_destroy(dhandle);
919
920         return FALSE;
921 }
922
923 static gboolean __pims_ipc_call_async_handler(GIOChannel *src, GIOCondition condition,
924                 gpointer data)
925 {
926         pims_ipc_s *handle = data;
927         pims_ipc_data_h dhandle = NULL;
928
929         if (__pims_ipc_receive(handle, &dhandle) == 0) {
930                 VERBOSE("call status = %d", handle->call_status);
931
932                 pthread_mutex_lock(&handle->call_status_mutex);
933                 if (handle->call_status != PIMS_IPC_CALL_STATUS_IN_PROGRESS) {
934                         pthread_mutex_unlock(&handle->call_status_mutex);
935                         pims_ipc_data_destroy(dhandle);
936                 } else {
937                         pthread_mutex_unlock(&handle->call_status_mutex);
938                         if (src == NULL) {    /* A response is arrived too quickly */
939                                 handle->dhandle_for_async_idler = dhandle;
940                                 g_idle_add(__call_async_idler_cb, handle);
941                         } else {
942                                 pthread_mutex_lock(&handle->call_status_mutex);
943                                 handle->call_status = PIMS_IPC_CALL_STATUS_READY;
944                                 pthread_mutex_unlock(&handle->call_status_mutex);
945
946                                 handle->call_async_callback((pims_ipc_h)handle, dhandle,
947                                                 handle->call_async_userdata);
948                                 pims_ipc_data_destroy(dhandle);
949                         }
950                 }
951         }
952         return FALSE;
953 }
954
955 API int pims_ipc_call_async(pims_ipc_h ipc, char *module, char *function,
956                 pims_ipc_data_h data_in, pims_ipc_call_async_cb callback, void *user_data)
957 {
958         pims_ipc_s *handle = ipc;
959         guint source_id = 0;
960
961         RETV_IF(NULL == ipc, -1);
962         RETV_IF(NULL == module, -1);
963         RETV_IF(NULL == function, -1);
964         RETV_IF(NULL == callback, -1);
965
966         pthread_mutex_lock(&handle->call_status_mutex);
967         if (handle->call_status != PIMS_IPC_CALL_STATUS_READY) {
968                 pthread_mutex_unlock(&handle->call_status_mutex);
969                 ERR("the previous call is in progress : %p", ipc);
970                 return -1;
971         }
972         pthread_mutex_unlock(&handle->call_status_mutex);
973
974         pthread_mutex_lock(&handle->call_status_mutex);
975         handle->call_status = PIMS_IPC_CALL_STATUS_IN_PROGRESS;
976         pthread_mutex_unlock(&handle->call_status_mutex);
977
978         handle->call_async_callback = callback;
979         handle->call_async_userdata = user_data;
980
981         /* add a callback for GIOChannel */
982         if (!handle->async_channel) {
983                 handle->async_channel = g_io_channel_unix_new(handle->fd);
984                 if (!handle->async_channel) {
985                         ERR("g_io_channel_unix_new error");
986                         return -1;
987                 }
988         }
989
990         source_id = g_io_add_watch(handle->async_channel, G_IO_IN,
991                         __pims_ipc_call_async_handler, handle);
992         handle->async_source_id = source_id;
993
994         if (__pims_ipc_send(handle, module, function, data_in) != 0) {
995                 g_source_remove(source_id);
996                 return -1;
997         }
998
999         __pims_ipc_call_async_handler(NULL, G_IO_NVAL, handle);
1000
1001         return 0;
1002 }
1003
1004 API int pims_ipc_is_call_in_progress(pims_ipc_h ipc)
1005 {
1006         int ret;
1007         pims_ipc_s *handle = ipc;
1008
1009         RETV_IF(NULL == ipc, FALSE);
1010
1011         pthread_mutex_lock(&handle->call_status_mutex);
1012         if (handle->call_status == PIMS_IPC_CALL_STATUS_IN_PROGRESS)
1013                 ret = TRUE;
1014         else
1015                 ret = FALSE;
1016         pthread_mutex_unlock(&handle->call_status_mutex);
1017         return ret;
1018 }
1019
1020 API int pims_ipc_subscribe(pims_ipc_h ipc, char *module, char *event,
1021                 pims_ipc_subscribe_cb callback, void *user_data)
1022 {
1023         gchar *call_id = NULL;
1024         pims_ipc_s *handle = ipc;
1025         pims_ipc_cb_s *cb_data = NULL;
1026
1027         RETV_IF(NULL == ipc, -1);
1028         RETV_IF(NULL == module, -1);
1029         RETV_IF(NULL == event, -1);
1030         RETV_IF(NULL == callback, -1);
1031         RETV_IF(NULL == handle->subscribe_cb_table, -1);
1032
1033         cb_data = g_new0(pims_ipc_cb_s, 1);
1034         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1035
1036         VERBOSE("subscribe cb id[%s]", call_id);
1037         cb_data->callback = callback;
1038         cb_data->user_data = user_data;
1039         g_hash_table_insert(handle->subscribe_cb_table, call_id, cb_data);
1040
1041         return 0;
1042 }
1043
1044 API int pims_ipc_unsubscribe(pims_ipc_h ipc, char *module, char *event)
1045 {
1046         gchar *call_id = NULL;
1047         pims_ipc_s *handle = ipc;
1048
1049         RETV_IF(NULL == ipc, -1);
1050         RETV_IF(NULL == module, -1);
1051         RETV_IF(NULL == event, -1);
1052         RETV_IF(NULL == handle->subscribe_cb_table, -1);
1053
1054         call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
1055
1056         VERBOSE("unsubscribe cb id[%s]", call_id);
1057
1058         if (g_hash_table_remove(handle->subscribe_cb_table, call_id) != TRUE) {
1059                 ERR("g_hash_table_remove error");
1060                 g_free(call_id);
1061                 return -1;
1062         }
1063
1064         g_free(call_id);
1065         return 0;
1066 }
1067
1068 API int pims_ipc_add_server_disconnected_cb(pims_ipc_h handle,
1069                 pims_ipc_server_disconnected_cb callback, void *user_data)
1070 {
1071         GList *cursor = NULL;
1072
1073         /* check already existed */
1074         pthread_mutex_lock(&__disconnect_cb_mutex);
1075         cursor = g_list_first(disconnected_list);
1076         while (cursor) {
1077                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1078                 if (disconnected && disconnected->handle == handle) {
1079                         ERR("Already set callback");
1080                         pthread_mutex_unlock(&__disconnect_cb_mutex);
1081                         return -1;
1082                 }
1083                 cursor = g_list_next(cursor);
1084         }
1085         pthread_mutex_unlock(&__disconnect_cb_mutex);
1086
1087         /* append callback */
1088         pims_ipc_server_disconnected_cb_t *disconnected = NULL;
1089         disconnected = calloc(1, sizeof(pims_ipc_server_disconnected_cb_t));
1090         if (NULL == disconnected) {
1091                 ERR("calloc() Fail");
1092                 return -1;
1093         }
1094         DBG("add disconnected");
1095         disconnected->handle = handle;
1096         disconnected->callback = callback;
1097         disconnected->user_data = user_data;
1098
1099         pthread_mutex_lock(&__disconnect_cb_mutex);
1100         disconnected_list = g_list_append(disconnected_list, disconnected);
1101         pthread_mutex_unlock(&__disconnect_cb_mutex);
1102
1103         return 0;
1104 }
1105
1106 API int pims_ipc_remove_server_disconnected_cb(pims_ipc_h handle)
1107 {
1108         pthread_mutex_lock(&__disconnect_cb_mutex);
1109
1110         GList *cursor = NULL;
1111         cursor = g_list_first(disconnected_list);
1112         while (cursor) {
1113                 pims_ipc_server_disconnected_cb_t *disconnected = cursor->data;
1114                 if (disconnected && disconnected->handle == handle) {
1115                         free(disconnected);
1116                         disconnected_list = g_list_delete_link(disconnected_list, cursor);
1117                         DBG("remove disconnected_cb");
1118                         break;
1119                 }
1120                 cursor = g_list_next(cursor);
1121         }
1122         pthread_mutex_unlock(&__disconnect_cb_mutex);
1123
1124         return 0;
1125 }
1126
1127 /* start deprecated */
1128 API int pims_ipc_unset_server_disconnected_cb()
1129 {
1130         pthread_mutex_lock(&__disconnect_cb_mutex);
1131         _server_disconnected_cb.callback = NULL;
1132         _server_disconnected_cb.user_data = NULL;
1133         pthread_mutex_unlock(&__disconnect_cb_mutex);
1134         return 0;
1135 }
1136
1137 API int pims_ipc_set_server_disconnected_cb(pims_ipc_server_disconnected_cb callback,
1138                 void *user_data)
1139 {
1140         pthread_mutex_lock(&__disconnect_cb_mutex);
1141         _server_disconnected_cb.callback = callback;
1142         _server_disconnected_cb.user_data = user_data;
1143         pthread_mutex_unlock(&__disconnect_cb_mutex);
1144         return 0;
1145 }
1146 /* end deprecated */