sync with 2.4
[platform/core/pim/pims-ipc.git] / src / pims-ipc-svc.c
1 /*
2  * PIMS IPC
3  *
4  * Copyright (c) 2012 - 2013 Samsung Electronics Co., Ltd. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the License);
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an AS IS BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19
20 #include <unistd.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <glib.h>
25 #include <pthread.h>
26 #include <stdint.h>
27 #include <poll.h>                               // pollfds
28 #include <fcntl.h>                              //fcntl
29 #include <unistd.h>
30 #include <systemd/sd-daemon.h>
31 #include <errno.h>
32
33 #include <sys/stat.h>
34 #include <sys/un.h>                     // sockaddr_un
35 #include <sys/ioctl.h>          // ioctl
36 #include <sys/epoll.h>          // epoll
37 #include <sys/eventfd.h>        // eventfd
38 #include <sys/socket.h>         //socket
39 #include <sys/types.h>
40
41 #include "pims-internal.h"
42 #include "pims-debug.h"
43 #include "pims-socket.h"
44 #include "pims-ipc-data.h"
45 #include "pims-ipc-data-internal.h"
46 #include "pims-ipc-svc.h"
47
48 #define PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT  2
49
50 typedef struct {
51         char *service;
52         gid_t group;
53         mode_t mode;
54
55         // callback functions
56         GHashTable *cb_table;                   // call_id, cb_data
57
58         // Global socket info and epoll thread
59         int sockfd;
60         bool epoll_stop_thread;
61
62         /////////////////////////////////////////////
63         // router inproc eventfd
64         int router;
65         int delay_count;  // not need mutex
66         // epoll thread add client_fd, when receive, router read requests
67         GList *request_queue;    // client_id lists to send request
68         pthread_mutex_t request_data_queue_mutex;
69         GHashTable *request_data_queue;  // key : client id, data : GList pims_ipc_raw_data_s (client_fd, seq_no, request(command), additional data...)
70         // router add client when receive connecting request, remove client when disconneting request in router thread
71         // manager remove client when terminating client without disconnect request in router thread
72         GHashTable *client_worker_map;           // key : client_id, worker_fd, not need mutex
73         GList *client_id_fd_map;                 // pims_ipc_client_map_s
74         //key :client_id(pid:seq_no), data : client_fd
75
76         /////////////////////////////////////////////
77         pthread_mutex_t task_fds_mutex;
78         // when starting worker thread, register fd
79         // when endting worker thread, deregister fd
80         GHashTable *task_fds;    // worker_fd - worker data (worker fd, client_fd, request queue(GList), stop_thread)
81         int workers_max_count;
82
83         /////////////////////////////////////////////
84         // manager inproc eventfd
85         int manager;
86         // write by new worker thread, read by manager in router thread, need mutex
87         pthread_mutex_t manager_queue_from_worker_mutex;
88         GList *manager_queue_from_worker;        // worker_fd => add to workers
89         // write in epoll thread(for dead client), read by manager in router thread, need mutex
90         pthread_mutex_t manager_queue_from_epoll_mutex;
91         GList *manager_queue_from_epoll; // cliend_fd => find worker_fd => add to idle workers
92         // managed by manager, router find idle worker when connecting new client in router thread => remove from idle workers
93         GList *workers;          // worker_fd list, not need mutex
94         /////////////////////////////////////////////
95 } pims_ipc_svc_s;
96
97 typedef struct {
98         char *service;
99         gid_t group;
100         mode_t mode;
101
102         int publish_sockfd;
103         bool epoll_stop_thread;
104         pthread_mutex_t subscribe_fds_mutex;
105         GList *subscribe_fds;           // cliend fd list
106 } pims_ipc_svc_for_publish_s;
107
108 typedef struct {
109         int fd;
110         char *id;
111 }pims_ipc_client_map_s;
112
113 typedef struct {
114         pims_ipc_svc_call_cb callback;
115         void * user_data;
116 } pims_ipc_svc_cb_s;
117
118 typedef struct {
119         pims_ipc_svc_client_disconnected_cb callback;
120         void * user_data;
121 } pims_ipc_svc_client_disconnected_cb_t;
122
123 typedef struct {
124         int fd;
125         int worker_id;  // pthrad_self()
126         int client_fd;
127         bool stop_thread;
128         GList *queue;           // pims_ipc_raw_data_s list
129         pthread_mutex_t queue_mutex;
130 } pims_ipc_worker_data_s;
131
132 typedef struct{
133         char *client_id;
134         unsigned int client_id_len;
135         unsigned int seq_no;
136         char *call_id;
137         unsigned int call_id_len;
138         unsigned int is_data;
139         unsigned int data_len;
140         char *data;
141 }pims_ipc_raw_data_s;
142
143 typedef struct {
144         int client_fd;
145         int request_count;
146         GList *raw_data;                // pims_ipc_raw_data_s list
147         pthread_mutex_t raw_data_mutex;
148 }pims_ipc_request_s;
149
150 static pims_ipc_svc_s *_g_singleton = NULL;
151 static pims_ipc_svc_for_publish_s *_g_singleton_for_publish = NULL;
152
153 static __thread pims_ipc_svc_client_disconnected_cb_t _client_disconnected_cb = {NULL, NULL};
154
155 static void __free_raw_data(pims_ipc_raw_data_s *data)
156 {
157         if (!data) return;
158
159         free(data->client_id);
160         free(data->call_id);
161         free(data->data);
162         free(data);
163 }
164
165 static void __worker_data_free(gpointer data)
166 {
167         pims_ipc_worker_data_s *worker_data = (pims_ipc_worker_data_s*)data;
168
169         pthread_mutex_lock(&worker_data->queue_mutex);
170         if (worker_data->queue) {
171                 GList *cursor = g_list_first(worker_data->queue);
172                 while(cursor) {
173                         GList *l = cursor;
174                         pims_ipc_raw_data_s *data = l->data;
175                         cursor = g_list_next(cursor);
176                         worker_data->queue = g_list_remove_link(worker_data->queue, l);
177                         g_list_free(l);
178                         __free_raw_data(data);
179                 }
180         }
181         pthread_mutex_unlock(&worker_data->queue_mutex);
182         free(worker_data);
183 }
184
185 API int pims_ipc_svc_init(char *service, gid_t group, mode_t mode)
186 {
187         if (_g_singleton) {
188                 ERROR("Already exist");
189                 return -1;
190         }
191
192         _g_singleton = g_new0(pims_ipc_svc_s, 1);
193         _g_singleton->service = g_strdup(service);
194         _g_singleton->group = group;
195         _g_singleton->mode = mode;
196         _g_singleton->workers_max_count = PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT;
197         _g_singleton->cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
198         ASSERT(_g_singleton->cb_table);
199
200         pthread_mutex_init(&_g_singleton->request_data_queue_mutex, 0);
201         _g_singleton->request_queue = NULL;
202         _g_singleton->request_data_queue = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);        // client_id - pims_ipc_raw_data_s
203         ASSERT(_g_singleton->request_data_queue);
204         _g_singleton->client_worker_map = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL);         // client id - worker_fd mapping
205         ASSERT(_g_singleton->client_worker_map);
206         _g_singleton->delay_count = 0;
207
208         pthread_mutex_init(&_g_singleton->task_fds_mutex, 0);
209         _g_singleton->task_fds = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __worker_data_free);                // pims_ipc_worker_data_s
210         ASSERT(_g_singleton->task_fds);
211
212         pthread_mutex_init(&_g_singleton->manager_queue_from_epoll_mutex, 0);
213         _g_singleton->manager_queue_from_epoll = NULL;
214
215         pthread_mutex_init(&_g_singleton->manager_queue_from_worker_mutex, 0);
216         _g_singleton->manager_queue_from_worker = NULL;
217         _g_singleton->workers = NULL;
218
219         _g_singleton->epoll_stop_thread = false;
220
221         return 0;
222 }
223
224 API int pims_ipc_svc_deinit(void)
225 {
226         if (!_g_singleton)
227                 return -1;
228
229         g_free(_g_singleton->service);
230         g_hash_table_destroy(_g_singleton->cb_table);
231
232         pthread_mutex_destroy(&_g_singleton->request_data_queue_mutex);
233         g_hash_table_destroy(_g_singleton->client_worker_map);
234         g_hash_table_destroy(_g_singleton->request_data_queue);
235         g_list_free_full(_g_singleton->request_queue, g_free);
236
237         pthread_mutex_destroy(&_g_singleton->task_fds_mutex);
238         g_hash_table_destroy(_g_singleton->task_fds);
239
240         pthread_mutex_destroy(&_g_singleton->manager_queue_from_epoll_mutex);
241         g_list_free_full(_g_singleton->manager_queue_from_epoll, g_free);
242         pthread_mutex_destroy(&_g_singleton->manager_queue_from_worker_mutex);
243         g_list_free(_g_singleton->manager_queue_from_worker);
244
245         GList *cursor = g_list_first(_g_singleton->client_id_fd_map);
246         while(cursor) {
247                 pims_ipc_client_map_s *client = cursor->data;
248                 _g_singleton->client_id_fd_map = g_list_remove_link(_g_singleton->client_id_fd_map, cursor);                    //free(client_id);
249                 free(client->id);
250                 free(client);
251                 g_list_free(cursor);
252                 cursor = g_list_first(_g_singleton->client_id_fd_map);
253         }
254         g_list_free(_g_singleton->client_id_fd_map);
255
256         g_list_free(_g_singleton->workers);
257         g_free(_g_singleton);
258         _g_singleton = NULL;
259
260         return 0;
261 }
262
263 API int pims_ipc_svc_register(char *module, char *function, pims_ipc_svc_call_cb callback, void *userdata)
264 {
265         pims_ipc_svc_cb_s *cb_data = NULL;
266         gchar *call_id = NULL;
267
268         if (!module || !function || !callback) {
269                 ERROR("Invalid argument");
270                 return -1;
271         }
272         cb_data = g_new0(pims_ipc_svc_cb_s, 1);
273         call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
274
275         VERBOSE("register cb id[%s]", call_id);
276         cb_data->callback = callback;
277         cb_data->user_data = userdata;
278         g_hash_table_insert(_g_singleton->cb_table, call_id, cb_data);
279
280         return 0;
281 }
282
283 API int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode)
284 {
285         if (_g_singleton_for_publish) {
286                 ERROR("Already exist");
287                 return -1;
288         }
289
290         _g_singleton_for_publish = g_new0(pims_ipc_svc_for_publish_s, 1);
291         _g_singleton_for_publish->service = g_strdup(service);
292         _g_singleton_for_publish->group = group;
293         _g_singleton_for_publish->mode = mode;
294         _g_singleton_for_publish->subscribe_fds = NULL;
295
296         pthread_mutex_init(&_g_singleton_for_publish->subscribe_fds_mutex, 0);
297
298         return 0;
299 }
300
301 API int pims_ipc_svc_deinit_for_publish(void)
302 {
303         if (!_g_singleton_for_publish)
304                 return -1;
305
306         pthread_mutex_destroy(&_g_singleton_for_publish->subscribe_fds_mutex);
307         g_list_free(_g_singleton_for_publish->subscribe_fds);
308
309         g_free(_g_singleton_for_publish->service);
310         g_free(_g_singleton_for_publish);
311         _g_singleton_for_publish = NULL;
312
313         return 0;
314 }
315
316 API int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data)
317 {
318         pims_ipc_svc_for_publish_s *ipc_svc = _g_singleton_for_publish;
319         gboolean is_valid = FALSE;
320         gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
321         pims_ipc_data_s *data_in = (pims_ipc_data_s*)data;
322         unsigned int call_id_len = strlen(call_id);
323         unsigned int is_data = FALSE;
324
325         do {
326                 // make publish data
327                 unsigned int len = sizeof(unsigned int)                                         // total size
328                         + call_id_len + sizeof(unsigned int)                    // call_id
329                         + sizeof(unsigned int);                                                 // is data
330                 unsigned int total_len = len;
331
332                 if (data_in) {
333                         is_data = TRUE;
334                         len += sizeof(unsigned int);
335                         total_len = len + data_in->buf_size;                    // data
336                 }
337
338                 char buf[len+1];
339                 int length = 0;
340                 memset(buf, 0x0, len+1);
341
342                 // total_size
343                 memcpy(buf, (void*)&total_len, sizeof(unsigned int));
344                 length += sizeof(unsigned int);
345
346                 // call_id
347                 memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
348                 length += sizeof(unsigned int);
349                 memcpy(buf+length, (void*)(call_id), call_id_len);
350                 length += call_id_len;
351                 g_free(call_id);
352
353                 // is_data
354                 memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
355                 length += sizeof(unsigned int);
356
357                 // data
358                 if (is_data) {
359                         memcpy(buf+length, (void*)&(data_in->buf_size), sizeof(unsigned int));
360                         length += sizeof(unsigned int);
361                 }
362
363                 // Publish to clients
364                 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
365                 GList *cursor = g_list_first(ipc_svc->subscribe_fds);
366                 int ret = 0;
367                 while(cursor) {
368                         int fd = (int)cursor->data;
369                         ret = socket_send(fd, buf, length);
370                         if (ret < 0) {
371                                 ERROR("socket_send publish error : %d", ret);
372                         }
373
374                         if (is_data) {
375                                 ret = socket_send_data(fd, data_in->buf, data_in->buf_size);
376                                 if (ret < 0) {
377                                         ERROR("socket_send_data publish error : %d", ret);
378                                 }
379                         }
380                         cursor = g_list_next(cursor);
381                 }
382                 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
383
384                 is_valid = TRUE;
385         } while (0);
386
387         if (is_valid == FALSE)
388                 return -1;
389         return 0;
390 }
391
392 static void __run_callback(int worker_id, char *call_id, pims_ipc_data_h dhandle_in, pims_ipc_data_h *dhandle_out)
393 {
394         pims_ipc_svc_cb_s *cb_data = NULL;
395
396         VERBOSE("Call id [%s]", call_id);
397
398         cb_data = (pims_ipc_svc_cb_s*)g_hash_table_lookup(_g_singleton->cb_table, call_id);
399         if (cb_data == NULL) {
400                 VERBOSE("unable to find %s", call_id);
401                 return;
402         }
403
404         cb_data->callback((pims_ipc_h)worker_id, dhandle_in, dhandle_out, cb_data->user_data);
405 }
406
407 static void __make_raw_data(const char *call_id, int seq_no, pims_ipc_data_h data, pims_ipc_raw_data_s**out)
408 {
409         pims_ipc_raw_data_s *raw_data = NULL;
410         raw_data = (pims_ipc_raw_data_s*)calloc(1, sizeof(pims_ipc_raw_data_s));
411         pims_ipc_data_s *data_in = (pims_ipc_data_s*)data;
412
413         raw_data->call_id = strdup(call_id);
414         raw_data->call_id_len = strlen(raw_data->call_id);
415         raw_data->seq_no = seq_no;
416
417         if (data_in && data_in->buf_size > 0) {
418                 raw_data->is_data = TRUE;
419                 raw_data->data = calloc(1, data_in->buf_size+1);
420                 memcpy(raw_data->data, data_in->buf, data_in->buf_size);
421                 raw_data->data_len = data_in->buf_size;
422         }
423         else {
424                 raw_data->is_data = FALSE;
425                 raw_data->data_len = 0;
426                 raw_data->data = NULL;
427         }
428         *out = raw_data;
429         return;
430 }
431
432 static int __send_raw_data(int fd, const char *client_id, pims_ipc_raw_data_s *data)
433 {
434         int ret = 0;
435         unsigned int client_id_len = strlen(client_id);
436
437         if (!data) {
438                 INFO("No data to send NULL\n");
439                 return -1;
440         }
441
442         unsigned int len = sizeof(unsigned int)                 // total size
443                 + client_id_len + sizeof(unsigned int)          // client_id
444                 + sizeof(unsigned int)                                                  // seq_no
445                 + data->call_id_len + sizeof(unsigned int)      // call_id
446                 + sizeof(unsigned int);                                                 // is data
447         unsigned int total_len = len;
448
449         if (data->is_data) {
450                 len += sizeof(unsigned int); // data
451                 total_len = len + data->data_len;               // data
452         }
453
454         INFO("client_id: %s, call_id : %s, seq no :%d, len:%d, total len :%d", client_id, data->call_id, data->seq_no, len, total_len);
455
456         char buf[len+1];
457
458         int length = 0;
459         memset(buf, 0x0, len+1);
460
461         // total_len
462         memcpy(buf, (void*)&total_len, sizeof(unsigned int));
463         length += sizeof(unsigned int);
464
465         // client_id
466         memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
467         length += sizeof(unsigned int);
468         memcpy(buf+length, (void*)(client_id), client_id_len);
469         length += client_id_len;
470
471         // seq_no
472         memcpy(buf+length, (void*)&(data->seq_no), sizeof(unsigned int));
473         length += sizeof(unsigned int);
474
475         // call id
476         memcpy(buf+length, (void*)&(data->call_id_len), sizeof(unsigned int));
477         length += sizeof(unsigned int);
478         memcpy(buf+length, (void*)(data->call_id), data->call_id_len);
479         length += data->call_id_len;
480
481         // is_data
482         memcpy(buf+length, (void*)&(data->is_data), sizeof(unsigned int));
483         length += sizeof(unsigned int);
484
485         if (data->is_data) {
486                 memcpy(buf+length, (void*)&(data->data_len), sizeof(unsigned int));
487                 length += sizeof(unsigned int);
488                 ret = socket_send(fd, buf, length);
489
490                 // send data
491                 if (ret > 0)
492                         ret += socket_send_data(fd, data->data, data->data_len);
493         }
494         else
495                 ret = socket_send(fd, buf, length);
496
497         return ret;
498 }
499
500 static gboolean __worker_raw_data_pop(pims_ipc_worker_data_s *worker, pims_ipc_raw_data_s **data)
501 {
502         if (!worker)
503                 return FALSE;
504
505         pthread_mutex_lock(&worker->queue_mutex);
506         if (!worker->queue) {
507                 pthread_mutex_unlock(&worker->queue_mutex);
508                 *data = NULL;
509                 return FALSE;
510         }
511
512         *data = g_list_first(worker->queue)->data;
513         worker->queue = g_list_delete_link(worker->queue, g_list_first(worker->queue));
514         pthread_mutex_unlock(&worker->queue_mutex);
515
516         return TRUE;
517 }
518
519 static void* __worker_loop(void *data)
520 {
521         int ret;
522         int worker_id;
523         int worker_fd;
524         pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
525         pims_ipc_worker_data_s *worker_data;
526         bool disconnected = false;
527
528         worker_fd = eventfd(0, 0);
529         if (worker_fd == -1)
530                 return NULL;
531         worker_id = (int)pthread_self();
532
533         worker_data = calloc(1, sizeof(pims_ipc_worker_data_s));
534         worker_data->fd = worker_fd;
535         worker_data->worker_id = worker_id;
536         worker_data->client_fd = -1;
537         worker_data->stop_thread = false;
538         pthread_mutex_init(&worker_data->queue_mutex, 0);
539
540         pthread_mutex_lock(&ipc_svc->task_fds_mutex);
541         g_hash_table_insert(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd), worker_data);
542         pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
543
544         pthread_mutex_lock(&ipc_svc->manager_queue_from_worker_mutex);
545         ipc_svc->manager_queue_from_worker = g_list_append(ipc_svc->manager_queue_from_worker, (void*)worker_fd);
546         pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
547
548         write_command(ipc_svc->manager, 1);
549         DEBUG("worker register to manager : worker_id(%08x00), worker_fd(%d)\n", worker_id, worker_fd);
550
551         struct pollfd *pollfds = (struct pollfd*) malloc (1 * sizeof (struct pollfd));
552         pollfds[0].fd = worker_fd;
553         pollfds[0].events = POLLIN;
554
555         while (!worker_data->stop_thread) {
556                 while(1) {
557                         if (worker_data->stop_thread)
558                                 break;
559                         ret = poll(pollfds, 1, 3000);   // waiting command from router
560                         if (ret == -1 && errno == EINTR) {
561                                 continue;
562                         }
563                         break;
564                 }
565
566                 if (worker_data->stop_thread)
567                         break;
568
569                 if (ret > 0) {
570                         pims_ipc_raw_data_s *raw_data = NULL;
571                         pims_ipc_raw_data_s *result = NULL;
572
573                         if (pollfds[0].revents & POLLIN) {
574                                 uint64_t dummy;
575                                 read_command(pollfds[0].fd, &dummy);
576                                 if (__worker_raw_data_pop(worker_data, &raw_data)) {
577                                         pims_ipc_data_h data_in = NULL;
578                                         pims_ipc_data_h data_out = NULL;
579
580                                         if (strcmp(PIMS_IPC_CALL_ID_CREATE, raw_data->call_id) == 0) {
581
582                                         }
583                                         else if (strcmp(PIMS_IPC_CALL_ID_DESTROY, raw_data->call_id) == 0) {
584                                                 disconnected = true;
585                                         }
586                                         else {
587                                                 data_in = pims_ipc_data_steal_unmarshal(raw_data->data, raw_data->data_len);
588                                                 raw_data->data = NULL;
589                                                 raw_data->data_len = 0;
590                                                 raw_data->is_data = false;
591                                                 __run_callback(worker_id, raw_data->call_id, data_in, &data_out);
592                                                 pims_ipc_data_destroy(data_in);
593                                         }
594
595                                         if (data_out) {
596                                                 __make_raw_data(raw_data->call_id, raw_data->seq_no, data_out, &result);
597                                                 pims_ipc_data_destroy(data_out);
598                                         }
599                                         else
600                                                 __make_raw_data(raw_data->call_id, raw_data->seq_no, NULL, &result);
601
602                                         if (worker_data->client_fd != -1)
603                                                 __send_raw_data(worker_data->client_fd, raw_data->client_id, result);
604                                         __free_raw_data(raw_data);
605                                         __free_raw_data(result);
606                                 }
607                         }
608                 }
609         }
610
611         if (!disconnected)
612                 ERROR("client fd closed, worker_fd : %d", worker_fd);
613         INFO("task thread terminated --------------------------- (worker_fd : %d)", worker_fd);
614
615         pthread_mutex_lock(&ipc_svc->task_fds_mutex);
616         g_hash_table_remove(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd));             // __worker_data_free will be called
617         pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
618
619         close(worker_fd);
620         free ((void*)pollfds);
621
622         if (_client_disconnected_cb.callback)
623                 _client_disconnected_cb.callback((pims_ipc_h)worker_id, _client_disconnected_cb.user_data);
624
625         return NULL;
626 }
627
628 static void __launch_thread(void *(*start_routine) (void *), void *data)
629 {
630         pthread_t worker;
631         pthread_attr_t attr;
632
633         // set kernel thread
634         pthread_attr_init(&attr);
635         pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
636
637         pthread_create(&worker, &attr, start_routine, data);
638         // detach this thread
639         pthread_detach(worker);
640 }
641
642 static gboolean __is_worker_available()
643 {
644         if (_g_singleton->workers)
645                 return TRUE;
646         else
647                 return FALSE;
648 }
649
650 static int __get_worker(const char *client_id, int *worker_id)
651 {
652         ASSERT(client_id);
653         ASSERT(worker_id);
654
655         if (!__is_worker_available()) {
656                 ERROR("There is no idle worker");
657                 return -1;
658         }
659         *worker_id = (int)(g_list_first(_g_singleton->workers)->data);
660         _g_singleton->workers = g_list_delete_link(_g_singleton->workers,
661                         g_list_first(_g_singleton->workers));
662
663         g_hash_table_insert(_g_singleton->client_worker_map, g_strdup(client_id), GINT_TO_POINTER(*worker_id));
664
665         return 0;
666 }
667
668 static int __find_worker(const char *client_id, int *worker_fd)
669 {
670         char *orig_pid = NULL;
671         int fd;
672
673         ASSERT(client_id);
674         ASSERT(worker_fd);
675
676         if (FALSE == g_hash_table_lookup_extended(_g_singleton->client_worker_map, client_id,
677                                 (gpointer*)&orig_pid, (gpointer*)&fd)) {
678                 VERBOSE("unable to find worker id for %s", client_id);
679                 return -1;
680         }
681
682         *worker_fd = GPOINTER_TO_INT(fd);
683         return 0;
684 }
685
686 static bool __request_pop(pims_ipc_request_s *data_queue, pims_ipc_raw_data_s **data)
687 {
688         bool ret = false;
689         GList *cursor;
690
691         pthread_mutex_lock(&data_queue->raw_data_mutex);
692         cursor = g_list_first(data_queue->raw_data);
693         if (cursor) {
694                 *data = cursor->data;
695                 data_queue->raw_data = g_list_delete_link(data_queue->raw_data, cursor);
696                 (data_queue->request_count)--;
697
698                 ret = true;
699         }
700         else
701                 *data = NULL;
702
703         pthread_mutex_unlock(&data_queue->raw_data_mutex);
704         return ret;
705 }
706
707 static bool __worker_raw_data_push(pims_ipc_worker_data_s *worker_data, int client_fd, pims_ipc_raw_data_s *data)
708 {
709         pthread_mutex_lock(&worker_data->queue_mutex);
710         worker_data->queue = g_list_append(worker_data->queue, data);
711         worker_data->client_fd = client_fd;
712         pthread_mutex_unlock(&worker_data->queue_mutex);
713
714         return true;
715 }
716
717 static int __process_router_event(pims_ipc_svc_s *ipc_svc, gboolean for_queue)
718 {
719         gboolean is_valid = FALSE;
720         pims_ipc_request_s *data_queue = NULL;
721         GList *queue_cursor = NULL;
722         int worker_fd = 0;
723         char *client_id = NULL;
724         int *org_fd;
725         int ret;
726
727         do {
728                 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
729                 queue_cursor = g_list_first(ipc_svc->request_queue);
730                 if (NULL == queue_cursor) {
731                         pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
732                         return 0;
733                 }
734                 client_id = (char *)(queue_cursor->data);
735                 ASSERT(client_id != NULL);
736                 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
737
738                 ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd, (gpointer*)&data_queue);
739
740                 if (for_queue)
741                         ipc_svc->delay_count--;
742
743                 if (ret == TRUE && data_queue) {
744                         int *org_fd;
745                         pims_ipc_worker_data_s *worker_data = NULL;
746
747                         pthread_mutex_lock(&data_queue->raw_data_mutex);
748                         GList *cursor = g_list_first(data_queue->raw_data);
749                         if (!cursor) {
750                                 pthread_mutex_unlock(&data_queue->raw_data_mutex);
751                                 break;
752                         }
753
754                         pims_ipc_raw_data_s *data = (pims_ipc_raw_data_s*)(cursor->data);
755                         char *call_id = data->call_id;
756                         int client_fd = data_queue->client_fd;
757
758                         ASSERT(call_id != NULL);
759
760                         VERBOSE("call_id = [%s]", call_id);
761                         if (strcmp(PIMS_IPC_CALL_ID_CREATE, call_id) == 0) {
762                                 // Get a worker. If cannot get a worker, create a worker and enqueue a current request
763                                 __launch_thread(__worker_loop, ipc_svc);
764                                 if (__get_worker((const char*)client_id, &worker_fd) != 0) {
765                                         ipc_svc->delay_count++;
766                                         pthread_mutex_unlock(&data_queue->raw_data_mutex);
767                                         is_valid = TRUE;
768                                         break;
769                                 }
770                         }
771                         else {
772                                 // Find a worker
773                                 if (__find_worker((const char*)client_id, &worker_fd) != 0) {
774                                         ERROR("unable to find a worker");
775                                         pthread_mutex_unlock(&data_queue->raw_data_mutex);
776                                         break;
777                                 }
778                         }
779                         pthread_mutex_unlock(&data_queue->raw_data_mutex);
780
781                         VERBOSE("routing client_id : %s, seq_no: %d, client_fd = %d, worker fd = %d", client_id, data->seq_no, client_fd, worker_fd);
782
783                         if (worker_fd <= 0)
784                                 break;
785
786                         pthread_mutex_lock(&ipc_svc->task_fds_mutex);
787                         if (FALSE == g_hash_table_lookup_extended(ipc_svc->task_fds,
788                                                 GINT_TO_POINTER(worker_fd), (gpointer*)&org_fd, (gpointer*)&worker_data)) {
789                                 ERROR("hash lookup fail : worker_fd (%d)", worker_fd);
790                                 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
791                                 break;
792                         }
793
794                         if (__request_pop(data_queue, &data)) {
795                                 __worker_raw_data_push(worker_data, client_fd, data);
796                                 write_command(worker_fd, 1);
797                         }
798
799                         pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
800                 }
801
802                 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
803                 free(client_id);
804                 ipc_svc->request_queue = g_list_delete_link(ipc_svc->request_queue, queue_cursor);
805                 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
806
807                 is_valid = TRUE;
808         } while (0);
809
810         if (is_valid == FALSE)
811                 return -1;
812
813         return 1;
814 }
815
816 static int __process_manager_event(pims_ipc_svc_s *ipc_svc)
817 {
818         GList *cursor = NULL;
819         int worker_fd;
820
821         // client socket terminated without disconnect request
822         pthread_mutex_lock(&ipc_svc->manager_queue_from_epoll_mutex);
823         if (ipc_svc->manager_queue_from_epoll) {
824                 cursor = g_list_first(ipc_svc->manager_queue_from_epoll);
825                 char *client_id = (char*)cursor->data;
826                 __find_worker(client_id, &worker_fd);
827
828                 ipc_svc->manager_queue_from_epoll = g_list_delete_link(ipc_svc->manager_queue_from_epoll, cursor);
829                 pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
830
831                 // remove client_fd
832                 g_hash_table_remove(ipc_svc->client_worker_map, client_id);
833                 free(client_id);
834
835                 // stop worker thread
836                 if (worker_fd) {
837                         int *org_fd;
838                         pims_ipc_worker_data_s *worker_data;
839
840                         pthread_mutex_lock(&ipc_svc->task_fds_mutex);
841                         if (FALSE == g_hash_table_lookup_extended(ipc_svc->task_fds,
842                                                 GINT_TO_POINTER(worker_fd), (gpointer*)&org_fd, (gpointer*)&worker_data)) {
843                                 ERROR("g_hash_table_lookup_extended fail : worker_fd (%d)", worker_fd);
844                                 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
845                                 return -1;
846                         }
847                         worker_data->stop_thread = true;
848                         worker_data->client_fd = -1;
849                         pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
850
851                         write_command(worker_fd, 1);
852                         VERBOSE("write command to worker terminate (worker_fd : %d)", worker_fd);
853                 }
854                 return 0;
855         }
856         pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
857
858         // create new worker
859         pthread_mutex_lock(&ipc_svc->manager_queue_from_worker_mutex);
860         if (ipc_svc->manager_queue_from_worker) {
861
862                 cursor = g_list_first(ipc_svc->manager_queue_from_worker);
863                 while (cursor) {
864                         worker_fd = (int)cursor->data;
865                         ipc_svc->manager_queue_from_worker = g_list_delete_link(ipc_svc->manager_queue_from_worker, cursor);
866
867                         if (worker_fd) {
868                                 DEBUG("add idle worker_fd : %d", worker_fd);
869                                 ipc_svc->workers = g_list_append(ipc_svc->workers, (void*)worker_fd);
870                         }
871                         cursor = g_list_first(ipc_svc->manager_queue_from_worker);
872                 }
873                 pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
874                 return 0;
875         }
876         pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
877
878         return 0;
879 }
880
881 // if delete = true, steal client_id, then free(client_id)
882 // if delete = false, return client_id pointer, then do no call free(client_id
883 static int __find_client_id(pims_ipc_svc_s *ipc_svc, int client_fd, bool delete, char **client_id)
884 {
885         pims_ipc_client_map_s *client;
886         GList *cursor = NULL;
887         cursor = g_list_first(ipc_svc->client_id_fd_map);
888         while(cursor) {
889                 client = cursor->data;
890                 if (client->fd == client_fd) {
891                         *client_id = client->id;
892                         if (delete) {
893                                 client->id = NULL;
894                                 ipc_svc->client_id_fd_map = g_list_delete_link(ipc_svc->client_id_fd_map, cursor);                      //free(client);
895                                 free(client);
896                         }
897                         return 0;
898                 }
899                 cursor = g_list_next(cursor);
900         }
901         return -1;
902 }
903
904 static void __request_push(pims_ipc_svc_s *ipc_svc, char *client_id, int client_fd, pims_ipc_raw_data_s *data)
905 {
906         int ret;
907         int *org_fd;
908         pims_ipc_request_s *data_queue = NULL;
909
910         pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
911         ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd,(gpointer*)&data_queue);
912         if (ret == TRUE && data_queue) {
913         }
914         else {
915                 data_queue = calloc(1, sizeof(pims_ipc_request_s));
916                 data_queue->request_count = 0;
917                 pthread_mutex_init(&data_queue->raw_data_mutex, 0);
918
919                 g_hash_table_insert(ipc_svc->request_data_queue, g_strdup(client_id), data_queue);
920         }
921         ipc_svc->request_queue = g_list_append(ipc_svc->request_queue, g_strdup(client_id));
922         pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
923
924         pthread_mutex_lock(&data_queue->raw_data_mutex);
925         data_queue->raw_data = g_list_append(data_queue->raw_data, data);
926         data_queue->client_fd = client_fd;
927         data_queue->request_count++;
928         pthread_mutex_unlock(&data_queue->raw_data_mutex);
929 }
930
931 static void __delete_request_queue(pims_ipc_svc_s *ipc_svc, char *client_id)
932 {
933         pims_ipc_request_s *data_queue = NULL;
934         int ret;
935         int *org_fd;
936         GList *l;
937         GList *cursor;
938
939         pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
940         ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd,(gpointer*)&data_queue);
941         if (ret == TRUE)
942                 g_hash_table_remove(ipc_svc->request_data_queue, (void*)client_id);
943
944         cursor = g_list_first(ipc_svc->request_queue);
945         while (cursor) {
946                 l = cursor;
947                 char *id = l->data;
948                 cursor = g_list_next(cursor);
949                 if (id && strcmp(id, client_id) == 0) {
950                         free(id);
951                         ipc_svc->request_queue = g_list_delete_link(ipc_svc->request_queue, l);
952                 }
953         }
954         pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
955
956         if (data_queue) {
957                 pthread_mutex_lock(&data_queue->raw_data_mutex);
958                 cursor = g_list_first(data_queue->raw_data);
959                 pims_ipc_raw_data_s *data;
960                 while(cursor) {
961                         l = cursor;
962                         data = (pims_ipc_raw_data_s *)cursor->data;
963                         cursor = g_list_next(cursor);
964                         data_queue->raw_data = g_list_delete_link(data_queue->raw_data, l);
965                         __free_raw_data(data);
966                 }
967                 g_list_free(data_queue->raw_data);
968                 pthread_mutex_unlock(&data_queue->raw_data_mutex);
969                 pthread_mutex_destroy(&data_queue->raw_data_mutex);
970                 free(data_queue);
971         }
972 }
973
974 static int __send_identify(int fd, unsigned int seq_no, char *id, int id_len)
975 {
976         int len = sizeof(unsigned int)                                  // total size
977                 + id_len + sizeof(unsigned int)         // id
978                 + sizeof(unsigned int);                         // seq_no
979
980         char buf[len+1];
981
982         int length = 0;
983         memset(buf, 0x0, len+1);
984
985         // total len
986         memcpy(buf, (void*)&len, sizeof(unsigned int));
987         length += sizeof(unsigned int);
988
989         // id
990         memcpy(buf+length, (void*)&(id_len), sizeof(unsigned int));
991         length += sizeof(unsigned int);
992         memcpy(buf+length, (void*)(id), id_len);
993         length += id_len;
994
995         // seq_no
996         memcpy(buf+length, (void*)&(seq_no), sizeof(unsigned int));
997         length += sizeof(unsigned int);
998
999         return socket_send(fd, buf, length);
1000 }
1001
1002 static int __recv_raw_data(int fd, pims_ipc_raw_data_s **data, bool *identity)
1003 {
1004         int len = 0;
1005         pims_ipc_raw_data_s *temp;
1006
1007         /* read the size of message. note that ioctl is non-blocking */
1008         if (ioctl(fd, FIONREAD, &len)) {
1009                 ERROR("ioctl failed: %d", errno);
1010                 return -1;
1011         }
1012
1013         /* when server or client closed socket */
1014         if (len == 0) {
1015                 INFO("[IPC Socket] connection is closed");
1016                 return 0;
1017         }
1018
1019         temp = (pims_ipc_raw_data_s*)calloc(1, sizeof(pims_ipc_raw_data_s));
1020         temp->client_id = NULL;
1021         temp->client_id_len = 0;
1022         temp->call_id = NULL;
1023         temp->call_id_len = 0;
1024         temp->seq_no = 0;
1025         temp->is_data = FALSE;
1026         temp->data = NULL;
1027         temp->data_len = 0;
1028
1029         int ret = 0;
1030         unsigned int read_len = 0;
1031         unsigned int total_len = 0;
1032         unsigned int is_data = FALSE;
1033
1034         do {
1035                 // total length
1036                 ret = read(fd, (void *)&total_len, sizeof(unsigned int));
1037                 if (ret < 0) {   ERROR("read error"); break;            }
1038                 read_len += ret;
1039
1040                 // client_id
1041                 ret  = read(fd, (void *)&(temp->client_id_len), sizeof(unsigned int));
1042                 if (ret < 0) {   ERROR("read error"); break;            }
1043                 read_len += ret;
1044
1045                 temp->client_id = calloc(1, temp->client_id_len+1);
1046                 ret = socket_recv(fd, (void *)&(temp->client_id), temp->client_id_len);
1047                 if (ret < 0) {   ERROR("socket_recv error"); break;             }
1048                 read_len += ret;
1049
1050                 // sequnce no
1051                 ret = read(fd, (void *)&(temp->seq_no), sizeof(unsigned int));
1052                 if (ret < 0) {   ERROR("read error"); break;            }
1053                 read_len += ret;
1054
1055                 if (total_len == read_len) {
1056                         *data = temp;
1057                         *identity = true;
1058                         return read_len;
1059                 }
1060
1061                 // call_id
1062                 ret  = read(fd, (void *)&(temp->call_id_len), sizeof(unsigned int));
1063                 if (ret < 0)    { ERROR("read error"); break;           }
1064                 read_len += ret;
1065
1066                 temp->call_id = calloc(1, temp->call_id_len+1);
1067                 ret = socket_recv(fd, (void *)&(temp->call_id), temp->call_id_len);
1068                 if (ret < 0) {   ERROR("socket_recv error"); break;             }
1069                 read_len += ret;
1070
1071                 // is_data
1072                 ret = read(fd, (void *)&(is_data), sizeof(unsigned int));
1073                 if (ret < 0) {   ERROR("read error"); break;            }
1074                 read_len += ret;
1075
1076                 // data
1077                 if (is_data) {
1078                         temp->is_data = TRUE;
1079                         ret = read(fd, (void *)&(temp->data_len), sizeof(unsigned int));
1080                         if (ret < 0) {  ERROR("read error"); break;             }
1081                         read_len += ret;
1082
1083                         temp->data = calloc(1, temp->data_len+1);
1084                         ret = socket_recv(fd, (void *)&(temp->data), temp->data_len);
1085                         if (ret < 0) {  ERROR("socket_recv error"); break;              }
1086                         read_len += ret;
1087                 }
1088
1089                 INFO("client_id : %s, call_id : %s, seq_no : %d", temp->client_id, temp->call_id, temp->seq_no);
1090
1091                 *data = temp;
1092                 *identity = false;
1093         }while(0);
1094
1095         if (ret < 0) {
1096                 __free_raw_data(temp);
1097                 *data = NULL;
1098                 *identity = false;
1099         }
1100
1101         return read_len;
1102 }
1103
1104 static gboolean __request_handler(GIOChannel *src, GIOCondition condition, gpointer data)
1105 {
1106         int ret;
1107         int event_fd = g_io_channel_unix_get_fd(src);
1108         char *client_id = NULL;
1109         pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1110
1111         if (G_IO_HUP & condition) {
1112                 INFO("client closed ------------------------client_fd : %d", event_fd);
1113
1114                 close(event_fd);
1115
1116                 // Find client_id
1117                 __find_client_id(ipc_svc, event_fd, true, &client_id);
1118
1119                 // Send client_id to manager to terminate worker thread
1120                 if (client_id) {
1121                         pthread_mutex_lock(&ipc_svc->manager_queue_from_epoll_mutex);
1122                         ipc_svc->manager_queue_from_epoll = g_list_append(ipc_svc->manager_queue_from_epoll, (void*)g_strdup(client_id));
1123                         pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
1124                         write_command(ipc_svc->manager, 1);
1125
1126                         __delete_request_queue(ipc_svc, client_id);
1127                         free(client_id);
1128                 }
1129
1130                 return FALSE;
1131         }
1132
1133         // receive data from client
1134         int recv_len;
1135         bool identity = false;
1136         pims_ipc_raw_data_s *req = NULL;
1137
1138         recv_len = __recv_raw_data(event_fd, &req, &identity);
1139         if (recv_len > 0) {
1140                 // send command to router
1141                 if (identity) {
1142                         pims_ipc_client_map_s *client = (pims_ipc_client_map_s*)calloc(1, sizeof(pims_ipc_client_map_s));
1143                         client->fd = event_fd;
1144                         client->id = req->client_id;
1145                         req->client_id = NULL;
1146                         ipc_svc->client_id_fd_map = g_list_append(ipc_svc->client_id_fd_map, client);
1147
1148                         // send server pid to client
1149                         char temp[100];
1150                         snprintf(temp, sizeof(temp), "%x", getpid());
1151                         ret = __send_identify(event_fd, req->seq_no, temp, strlen(temp));
1152
1153                         __free_raw_data(req);
1154                         if (ret < 0)
1155                                 return FALSE;
1156                         return TRUE;
1157                 }
1158
1159                 __find_client_id(ipc_svc, event_fd, false, &client_id);
1160
1161                 if (client_id) {
1162                         __request_push(ipc_svc, client_id, event_fd, req);
1163                         write_command(ipc_svc->router, 1);
1164                 }
1165                 else
1166                         ERROR("__find_client_id fail : event_fd (%d)", event_fd);
1167         }
1168         else {
1169                 ERROR("receive invalid : %d", event_fd);
1170                 close(event_fd);
1171                 return FALSE;
1172         }
1173
1174         return TRUE;
1175 }
1176
1177 static gboolean __socket_handler(GIOChannel *src, GIOCondition condition, gpointer data)
1178 {
1179         GIOChannel *channel;
1180         pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1181         int client_sockfd = -1;
1182         int sockfd = ipc_svc->sockfd;
1183         struct sockaddr_un clientaddr;
1184         socklen_t client_len = sizeof(clientaddr);
1185
1186         client_sockfd = accept(sockfd, (struct sockaddr *)&clientaddr, &client_len);
1187         if (-1 == client_sockfd) {
1188                 ERROR("accept error : %s", strerror(errno));
1189                 return TRUE;
1190         }
1191
1192         channel = g_io_channel_unix_new(client_sockfd);
1193         g_io_add_watch(channel, G_IO_IN|G_IO_HUP, __request_handler, data);
1194         g_io_channel_unref(channel);
1195
1196         return TRUE;
1197 }
1198
1199 static void* __main_loop(void *user_data)
1200 {
1201         int ret;
1202         struct sockaddr_un addr;
1203         GIOChannel *gio = NULL;
1204         pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)user_data;
1205
1206         if (sd_listen_fds(1) == 1 && sd_is_socket_unix(SD_LISTEN_FDS_START, SOCK_STREAM, -1, ipc_svc->service, 0) > 0) {
1207                  ipc_svc->sockfd = SD_LISTEN_FDS_START;
1208         }
1209         else {
1210                 unlink(ipc_svc->service);
1211                 ipc_svc->sockfd = socket(PF_UNIX, SOCK_STREAM, 0);
1212
1213                 bzero(&addr, sizeof(addr));
1214                 addr.sun_family = AF_UNIX;
1215                 snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service);
1216
1217                 ret = bind(ipc_svc->sockfd, (struct sockaddr *)&addr, sizeof(addr));
1218                 if (ret != 0)
1219                         ERROR("bind error :%d", ret);
1220                 ret = listen(ipc_svc->sockfd, 30);
1221
1222                 ret = chown(ipc_svc->service, getuid(), ipc_svc->group);
1223                 ret = chmod(ipc_svc->service, ipc_svc->mode);
1224         }
1225
1226         gio = g_io_channel_unix_new(ipc_svc->sockfd);
1227
1228         g_io_add_watch(gio, G_IO_IN, __socket_handler, (gpointer)ipc_svc);
1229
1230         return NULL;
1231 }
1232
1233 static int __open_router_fd(pims_ipc_svc_s *ipc_svc)
1234 {
1235         int ret = -1;
1236         int flags;
1237         int router;
1238         int manager;
1239
1240         // router inproc eventfd
1241         router = eventfd(0,0);
1242         if (-1 == router) {
1243                 ERROR("eventfd error : %d", errno);
1244                 return -1;
1245         }
1246         VERBOSE("router :%d\n", router);
1247
1248         flags = fcntl(router, F_GETFL, 0);
1249         if (flags == -1)
1250                 flags = 0;
1251         ret = fcntl (router, F_SETFL, flags | O_NONBLOCK);
1252         VERBOSE("rounter fcntl : %d\n", ret);
1253
1254         // manager inproc eventfd
1255         manager = eventfd(0,0);
1256         if (-1 == manager) {
1257                 ERROR("eventfd error : %d", errno);
1258                 close(router);
1259                 return -1;
1260         }
1261         VERBOSE("manager :%d\n", manager);
1262
1263         flags = fcntl(manager, F_GETFL, 0);
1264         if (flags == -1)
1265                 flags = 0;
1266         ret = fcntl (manager, F_SETFL, flags | O_NONBLOCK);
1267         VERBOSE("manager fcntl : %d\n", ret);
1268
1269         ipc_svc->router = router;
1270         ipc_svc->manager = manager;
1271
1272         return 0;
1273 }
1274
1275 static void __close_router_fd(pims_ipc_svc_s *ipc_svc)
1276 {
1277         close(ipc_svc->router);
1278         close(ipc_svc->manager);
1279 }
1280
1281 static void* __publish_loop(void *user_data)
1282 {
1283         int ret;
1284         int epfd;
1285
1286         struct sockaddr_un addr;
1287         struct epoll_event ev = {0};
1288         pims_ipc_svc_for_publish_s *ipc_svc = (pims_ipc_svc_for_publish_s*)user_data;
1289
1290         unlink(ipc_svc->service);
1291         ipc_svc->publish_sockfd = socket(PF_UNIX, SOCK_STREAM, 0);
1292
1293         bzero(&addr, sizeof(struct sockaddr_un));
1294         addr.sun_family = AF_UNIX;
1295         snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service);
1296
1297         int flags = fcntl (ipc_svc->publish_sockfd, F_GETFL, 0);
1298         if (flags == -1)
1299                 flags = 0;
1300         ret = fcntl (ipc_svc->publish_sockfd, F_SETFL, flags | O_NONBLOCK);
1301         VERBOSE("publish socketfd fcntl : %d\n", ret);
1302
1303         ret = bind(ipc_svc->publish_sockfd, (struct sockaddr *)&(addr), sizeof(struct sockaddr_un));
1304         if (ret != 0)
1305                 ERROR("bind error :%d", ret);
1306         ret = listen(ipc_svc->publish_sockfd, 30);
1307         WARN_IF(ret != 0, "listen error :%d", ret);
1308
1309         ret = chown(ipc_svc->service, getuid(), ipc_svc->group);
1310         WARN_IF(ret != 0, "listen error :%d", ret);
1311         ret = chmod(ipc_svc->service, ipc_svc->mode);
1312         WARN_IF(ret != 0, "listen error :%d", ret);
1313
1314         epfd = epoll_create(MAX_EPOLL_EVENT);
1315
1316         ev.events = EPOLLIN | EPOLLHUP;
1317         ev.data.fd = ipc_svc->publish_sockfd;
1318
1319         ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ipc_svc->publish_sockfd, &ev);
1320         WARN_IF(ret != 0, "listen error :%d", ret);
1321
1322         while (!ipc_svc->epoll_stop_thread) {
1323                 int i = 0;
1324                 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
1325                 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, -1);
1326
1327                 if (ipc_svc->epoll_stop_thread)
1328                         break;
1329
1330                 if (event_num == -1) {
1331                         if (errno != EINTR) {
1332                                 ERROR("errno:%d\n", errno);
1333                                 break;
1334                         }
1335                 }
1336
1337                 for (i = 0; i < event_num; i++) {
1338                         int event_fd = events[i].data.fd;
1339
1340                         if (events[i].events & EPOLLHUP) {
1341                                 VERBOSE("client closed -----------------------------------------:%d", event_fd);
1342                                 if (epoll_ctl(epfd, EPOLL_CTL_DEL, event_fd, events) == -1) {
1343                                         ERROR("epoll_ctl (EPOLL_CTL_DEL) fail : errno(%d)", errno);
1344                                 }
1345                                 close(event_fd);
1346
1347                                 // Find client_id and delete
1348                                 GList *cursor = NULL;
1349
1350                                 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
1351                                 cursor = g_list_first(ipc_svc->subscribe_fds);
1352                                 while (cursor) {
1353                                         if (event_fd == (int)cursor->data) {
1354                                                 ipc_svc->subscribe_fds = g_list_delete_link(ipc_svc->subscribe_fds, cursor);
1355                                                 break;
1356                                         }
1357                                         cursor = g_list_next(cursor);
1358                                 }
1359                                 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
1360                                 continue;
1361                         }
1362                         else if (event_fd == ipc_svc->publish_sockfd) {         // connect client
1363                                 struct sockaddr_un remote;
1364                                 int remote_len = sizeof(remote);
1365                                 int client_fd = accept(ipc_svc->publish_sockfd, (struct sockaddr *)&remote, (socklen_t*) &remote_len);
1366                                 if (client_fd == -1) {
1367                                         ERROR("accept fail : errno : %d", errno);
1368                                         continue;
1369                                 }
1370                                 VERBOSE("client subscriber connect: %d", client_fd);
1371
1372                                 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
1373                                 ipc_svc->subscribe_fds = g_list_append(ipc_svc->subscribe_fds, (void*)client_fd);
1374                                 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
1375
1376                                 ev.events = EPOLLIN;
1377                                 ev.data.fd = client_fd;
1378                                 if (epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
1379                                         ERROR("epoll_ctl (EPOLL_CTL_ADD) fail : error(%d)\n", errno);
1380                                         continue;
1381                                 }
1382                         }
1383                 }
1384         }
1385
1386         close(ipc_svc->publish_sockfd);
1387         close(epfd);
1388
1389         return NULL;
1390 }
1391
1392 static void __stop_for_publish(pims_ipc_svc_for_publish_s *ipc_svc)
1393 {
1394         ipc_svc->epoll_stop_thread = true;
1395 }
1396
1397 static void* __router_loop(void *data)
1398 {
1399         pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1400         int fd_count = 2;
1401         struct pollfd *pollfds;
1402
1403         pollfds = (struct pollfd*) malloc (fd_count * sizeof (struct pollfd));
1404
1405         pollfds[0].fd = ipc_svc->router;
1406         pollfds[0].events = POLLIN;
1407         pollfds[1].fd = ipc_svc->manager;
1408         pollfds[1].events = POLLIN;
1409
1410         while (1) {
1411                 int ret = -1;
1412                 uint64_t dummy;
1413                 int check_router_queue = -1;
1414                 int check_manager_queue = -1;
1415
1416                 while (1) {
1417                         ret = poll(pollfds, fd_count, 1000);
1418                         if (ret == -1 && errno == EINTR) {
1419                                 //free (pollfds);
1420                                 continue;               //return NULL;
1421                         }
1422                         break;
1423                 }
1424
1425                 if (ret > 0) {
1426                         if (pollfds[0].revents & POLLIN) {
1427                                 // request router: send request to worker
1428                                 if (sizeof (dummy) == read_command(pollfds[0].fd, &dummy)) {
1429                                         check_router_queue = __process_router_event(ipc_svc, false);
1430                                 }
1431                         }
1432
1433                         if (pollfds[1].revents & POLLIN) {
1434                                 // worker manager
1435                                 if (sizeof (dummy) == read_command(pollfds[1].fd, &dummy)) {
1436                                         check_manager_queue = __process_manager_event(ipc_svc);
1437                                         if (ipc_svc->delay_count > 0)
1438                                                 check_router_queue = __process_router_event(ipc_svc, true);
1439                                 }
1440                         }
1441                 }
1442
1443                 // check queue
1444                 while(check_router_queue > 0 || check_manager_queue > 0) {
1445                         read_command(pollfds[0].fd, &dummy);
1446                         check_router_queue = __process_router_event(ipc_svc, false);
1447
1448                         read_command(pollfds[1].fd, &dummy);
1449                         check_manager_queue = __process_manager_event(ipc_svc);
1450                         if (ipc_svc->delay_count > 0)
1451                                 check_router_queue = __process_router_event(ipc_svc, true);
1452                 }
1453         }
1454
1455         free(pollfds);
1456
1457         return NULL;
1458 }
1459
1460 API void pims_ipc_svc_run_main_loop(GMainLoop* loop)
1461 {
1462         int ret = -1;
1463         GMainLoop* main_loop = loop;
1464
1465         if (main_loop == NULL) {
1466                 main_loop = g_main_loop_new(NULL, FALSE);
1467         }
1468
1469         if (_g_singleton_for_publish)
1470                 __launch_thread(__publish_loop, _g_singleton_for_publish);
1471
1472         if (_g_singleton) {
1473                 ret = __open_router_fd(_g_singleton);
1474                 ASSERT(ret == 0);
1475
1476                 int i;
1477                 // launch worker threads in advance
1478                 for (i = 0; i < _g_singleton->workers_max_count; i++)
1479                         __launch_thread(__worker_loop, _g_singleton);
1480
1481                 __launch_thread(__router_loop, _g_singleton);
1482                 __main_loop(_g_singleton);
1483         }
1484
1485         g_main_loop_run(main_loop);
1486
1487         if (_g_singleton)
1488                 __close_router_fd(_g_singleton);
1489
1490         if (_g_singleton_for_publish)
1491                 __stop_for_publish(_g_singleton_for_publish);
1492
1493 }
1494
1495 API void pims_ipc_svc_set_client_disconnected_cb(pims_ipc_svc_client_disconnected_cb callback, void *user_data)
1496 {
1497         if (_client_disconnected_cb.callback) {
1498                 ERROR("already registered");
1499                 return;
1500         }
1501         _client_disconnected_cb.callback = callback;
1502         _client_disconnected_cb.user_data = user_data;
1503 }
1504
1505