4 * Copyright (c) 2012 - 2015 Samsung Electronics Co., Ltd. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the License);
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
26 #include <poll.h> // pollfds
27 #include <fcntl.h> //fcntl
29 #include <systemd/sd-daemon.h>
33 #include <sys/un.h> // sockaddr_un
34 #include <sys/ioctl.h> // ioctl
35 #include <sys/epoll.h> // epoll
36 #include <sys/eventfd.h> // eventfd
37 #include <sys/socket.h> //socket
38 #include <sys/types.h>
40 #include <cynara-client.h>
41 #include <cynara-session.h>
42 #include <cynara-creds-socket.h>
44 #include "pims-internal.h"
45 #include "pims-debug.h"
46 #include "pims-socket.h"
47 #include "pims-ipc-data.h"
48 #include "pims-ipc-data-internal.h"
49 #include "pims-ipc-svc.h"
51 #define PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT 2
59 GHashTable *cb_table; // call_id, cb_data
61 // Global socket info and epoll thread
63 bool epoll_stop_thread;
65 /////////////////////////////////////////////
66 // router inproc eventfd
68 int delay_count; // not need mutex
69 // epoll thread add client_fd, when receive, router read requests
70 GList *request_queue; // client_id lists to send request
71 pthread_mutex_t request_data_queue_mutex;
72 GHashTable *request_data_queue; // key : client id, data : GList pims_ipc_raw_data_s (client_fd, seq_no, request(command), additional data...)
73 // router add client when receive connecting request, remove client when disconneting request in router thread
74 // manager remove client when terminating client without disconnect request in router thread
75 GHashTable *client_worker_map; // key : client_id, worker_fd, not need mutex
76 GList *client_id_fd_map; // pims_ipc_client_map_s
77 //key :client_id(pid:seq_no), data : client_fd
79 /////////////////////////////////////////////
80 pthread_mutex_t task_fds_mutex;
81 // when starting worker thread, register fd
82 // when endting worker thread, deregister fd
83 GHashTable *task_fds; // worker_fd - worker data (worker fd, client_fd, request queue(GList), stop_thread)
84 int workers_max_count;
86 /////////////////////////////////////////////
87 // manager inproc eventfd
89 // write by new worker thread, read by manager in router thread, need mutex
90 pthread_mutex_t manager_queue_from_worker_mutex;
91 GList *manager_queue_from_worker; // worker_fd => add to workers
92 // write in epoll thread(for dead client), read by manager in router thread, need mutex
93 pthread_mutex_t manager_queue_from_epoll_mutex;
94 GList *manager_queue_from_epoll; // cliend_fd => find worker_fd => add to idle workers
95 // managed by manager, router find idle worker when connecting new client in router thread => remove from idle workers
96 GList *workers; // worker_fd list, not need mutex
97 /////////////////////////////////////////////
99 pthread_mutex_t cynara_mutex;
101 int unique_sequence_number;
102 pthread_mutex_t client_info_mutex;
103 GHashTable *worker_client_info_map; // key : worker_id, data : pims_ipc_client_info_s*
104 GHashTable *client_info_map; // key : client_id, data : pims_ipc_client_info_s*
110 char *client_session;
111 } pims_ipc_client_info_s ;
119 bool epoll_stop_thread;
120 pthread_mutex_t subscribe_fds_mutex;
121 GList *subscribe_fds; // cliend fd list
122 } pims_ipc_svc_for_publish_s;
127 }pims_ipc_client_map_s;
130 pims_ipc_svc_call_cb callback;
135 pims_ipc_svc_client_disconnected_cb callback;
137 } pims_ipc_svc_client_disconnected_cb_t;
141 int worker_id; // pthrad_self()
144 GList *queue; // pims_ipc_raw_data_s list
145 pthread_mutex_t queue_mutex;
146 } pims_ipc_worker_data_s;
150 unsigned int client_id_len;
153 unsigned int call_id_len;
154 unsigned int is_data;
155 unsigned int data_len;
157 }pims_ipc_raw_data_s;
162 GList *raw_data; // pims_ipc_raw_data_s list
163 pthread_mutex_t raw_data_mutex;
166 static pims_ipc_svc_s *_g_singleton = NULL;
167 static pims_ipc_svc_for_publish_s *_g_singleton_for_publish = NULL;
169 static __thread pims_ipc_svc_client_disconnected_cb_t _client_disconnected_cb = {NULL, NULL};
171 static void __free_raw_data(pims_ipc_raw_data_s *data)
175 free(data->client_id);
181 static void __worker_data_free(gpointer data)
183 pims_ipc_worker_data_s *worker_data = (pims_ipc_worker_data_s*)data;
185 pthread_mutex_lock(&worker_data->queue_mutex);
186 if (worker_data->queue) {
187 GList *cursor = g_list_first(worker_data->queue);
190 pims_ipc_raw_data_s *data = l->data;
191 cursor = g_list_next(cursor);
192 worker_data->queue = g_list_remove_link(worker_data->queue, l);
194 __free_raw_data(data);
197 pthread_mutex_unlock(&worker_data->queue_mutex);
201 static void _destroy_client_info(gpointer p)
203 pims_ipc_client_info_s *client_info = p;
205 if (NULL == client_info)
207 free(client_info->smack);
208 free(client_info->uid);
209 free(client_info->client_session);
213 API int pims_ipc_svc_init(char *service, gid_t group, mode_t mode)
216 ERROR("Already exist");
220 _g_singleton = g_new0(pims_ipc_svc_s, 1);
221 ASSERT(_g_singleton);
223 _g_singleton->service = g_strdup(service);
224 _g_singleton->group = group;
225 _g_singleton->mode = mode;
226 _g_singleton->workers_max_count = PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT;
227 _g_singleton->cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
228 ASSERT(_g_singleton->cb_table);
230 pthread_mutex_init(&_g_singleton->request_data_queue_mutex, 0);
231 _g_singleton->request_queue = NULL;
232 _g_singleton->request_data_queue = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); // client_id - pims_ipc_raw_data_s
233 ASSERT(_g_singleton->request_data_queue);
234 _g_singleton->client_worker_map = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); // client id - worker_fd mapping
235 ASSERT(_g_singleton->client_worker_map);
236 _g_singleton->delay_count = 0;
238 pthread_mutex_init(&_g_singleton->task_fds_mutex, 0);
239 _g_singleton->task_fds = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __worker_data_free); // pims_ipc_worker_data_s
240 ASSERT(_g_singleton->task_fds);
242 pthread_mutex_init(&_g_singleton->manager_queue_from_epoll_mutex, 0);
243 _g_singleton->manager_queue_from_epoll = NULL;
245 pthread_mutex_init(&_g_singleton->manager_queue_from_worker_mutex, 0);
246 _g_singleton->manager_queue_from_worker = NULL;
247 _g_singleton->workers = NULL;
249 _g_singleton->unique_sequence_number = 0;
251 _g_singleton->worker_client_info_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, _destroy_client_info);
252 _g_singleton->client_info_map = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, _destroy_client_info);
253 pthread_mutex_init(&_g_singleton->client_info_mutex, 0);
255 pthread_mutex_init(&_g_singleton->cynara_mutex, 0);
256 _g_singleton->epoll_stop_thread = false;
258 int ret = cynara_initialize(&_g_singleton->cynara, NULL);
259 if (CYNARA_API_SUCCESS != ret) {
260 char errmsg[1024] = {0};
261 cynara_strerror(ret, errmsg, sizeof(errmsg));
262 ERROR("cynara_initialize() Fail(%d,%s)", ret, errmsg);
268 API int pims_ipc_svc_deinit(void)
273 g_free(_g_singleton->service);
274 g_hash_table_destroy(_g_singleton->cb_table);
276 pthread_mutex_destroy(&_g_singleton->request_data_queue_mutex);
277 g_hash_table_destroy(_g_singleton->client_worker_map);
278 g_hash_table_destroy(_g_singleton->request_data_queue);
279 g_list_free_full(_g_singleton->request_queue, g_free);
281 pthread_mutex_destroy(&_g_singleton->task_fds_mutex);
282 g_hash_table_destroy(_g_singleton->task_fds);
284 pthread_mutex_destroy(&_g_singleton->manager_queue_from_epoll_mutex);
285 g_list_free_full(_g_singleton->manager_queue_from_epoll, g_free);
286 pthread_mutex_destroy(&_g_singleton->manager_queue_from_worker_mutex);
287 g_list_free(_g_singleton->manager_queue_from_worker);
289 GList *cursor = g_list_first(_g_singleton->client_id_fd_map);
291 pims_ipc_client_map_s *client = cursor->data;
292 _g_singleton->client_id_fd_map = g_list_remove_link(_g_singleton->client_id_fd_map, cursor); //free(client_id);
296 cursor = g_list_first(_g_singleton->client_id_fd_map);
298 g_list_free(_g_singleton->client_id_fd_map);
300 pthread_mutex_destroy(&_g_singleton->client_info_mutex);
301 g_hash_table_destroy(_g_singleton->worker_client_info_map);
302 g_hash_table_destroy(_g_singleton->client_info_map);
304 pthread_mutex_lock(&_g_singleton->cynara_mutex);
305 int ret = cynara_finish(_g_singleton->cynara);
306 if (CYNARA_API_SUCCESS != ret) {
307 char errmsg[1024] = {0};
308 cynara_strerror(ret, errmsg, sizeof(errmsg));
309 ERROR("cynara_finish() Fail(%d,%s)", ret, errmsg);
311 pthread_mutex_unlock(&_g_singleton->cynara_mutex);
312 pthread_mutex_destroy(&_g_singleton->cynara_mutex);
314 g_list_free(_g_singleton->workers);
315 g_free(_g_singleton);
321 API int pims_ipc_svc_register(char *module, char *function, pims_ipc_svc_call_cb callback, void *userdata)
323 pims_ipc_svc_cb_s *cb_data = NULL;
324 gchar *call_id = NULL;
326 if (!module || !function || !callback) {
327 ERROR("Invalid argument");
330 cb_data = g_new0(pims_ipc_svc_cb_s, 1);
331 call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
333 VERBOSE("register cb id[%s]", call_id);
334 cb_data->callback = callback;
335 cb_data->user_data = userdata;
336 g_hash_table_insert(_g_singleton->cb_table, call_id, cb_data);
341 API int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode)
343 if (_g_singleton_for_publish) {
344 ERROR("Already exist");
348 _g_singleton_for_publish = g_new0(pims_ipc_svc_for_publish_s, 1);
349 _g_singleton_for_publish->service = g_strdup(service);
350 _g_singleton_for_publish->group = group;
351 _g_singleton_for_publish->mode = mode;
352 _g_singleton_for_publish->subscribe_fds = NULL;
354 pthread_mutex_init(&_g_singleton_for_publish->subscribe_fds_mutex, 0);
359 API int pims_ipc_svc_deinit_for_publish(void)
361 if (!_g_singleton_for_publish)
364 pthread_mutex_destroy(&_g_singleton_for_publish->subscribe_fds_mutex);
365 g_list_free(_g_singleton_for_publish->subscribe_fds);
367 g_free(_g_singleton_for_publish->service);
368 g_free(_g_singleton_for_publish);
369 _g_singleton_for_publish = NULL;
374 API int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data)
376 pims_ipc_svc_for_publish_s *ipc_svc = _g_singleton_for_publish;
377 gboolean is_valid = FALSE;
378 gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
379 pims_ipc_data_s *data_in = (pims_ipc_data_s*)data;
380 unsigned int call_id_len = strlen(call_id);
381 unsigned int is_data = FALSE;
385 unsigned int len = sizeof(unsigned int) // total size
386 + call_id_len + sizeof(unsigned int) // call_id
387 + sizeof(unsigned int); // is data
388 unsigned int total_len = len;
392 len += sizeof(unsigned int);
393 total_len = len + data_in->buf_size; // data
398 memset(buf, 0x0, len+1);
401 memcpy(buf, (void*)&total_len, sizeof(unsigned int));
402 length += sizeof(unsigned int);
405 memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
406 length += sizeof(unsigned int);
407 memcpy(buf+length, (void*)(call_id), call_id_len);
408 length += call_id_len;
412 memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
413 length += sizeof(unsigned int);
417 memcpy(buf+length, (void*)&(data_in->buf_size), sizeof(unsigned int));
418 length += sizeof(unsigned int);
421 // Publish to clients
422 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
423 GList *cursor = g_list_first(ipc_svc->subscribe_fds);
426 int fd = (int)cursor->data;
427 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
428 ret = socket_send(fd, buf, length);
430 ERROR("socket_send publish error : %d", ret);
434 ret = socket_send_data(fd, data_in->buf, data_in->buf_size);
436 ERROR("socket_send_data publish error : %d", ret);
439 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
440 cursor = g_list_next(cursor);
442 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
447 if (is_valid == FALSE)
452 static void __run_callback(int worker_id, char *call_id, pims_ipc_data_h dhandle_in, pims_ipc_data_h *dhandle_out)
454 pims_ipc_svc_cb_s *cb_data = NULL;
456 VERBOSE("Call id [%s]", call_id);
458 cb_data = (pims_ipc_svc_cb_s*)g_hash_table_lookup(_g_singleton->cb_table, call_id);
459 if (cb_data == NULL) {
460 VERBOSE("unable to find %s", call_id);
464 cb_data->callback((pims_ipc_h)worker_id, dhandle_in, dhandle_out, cb_data->user_data);
467 static void __make_raw_data(const char *call_id, int seq_no, pims_ipc_data_h data, pims_ipc_raw_data_s **out)
470 ERROR("Invalid parameter:out is NULL");
474 pims_ipc_raw_data_s *raw_data = NULL;
475 raw_data = (pims_ipc_raw_data_s*)calloc(1, sizeof(pims_ipc_raw_data_s));
476 if (NULL == raw_data) {
477 ERROR("calloc() Fail");
480 pims_ipc_data_s *data_in = (pims_ipc_data_s*)data;
482 raw_data->call_id = strdup(call_id);
483 raw_data->call_id_len = strlen(raw_data->call_id);
484 raw_data->seq_no = seq_no;
486 if (data_in && data_in->buf_size > 0) {
487 raw_data->is_data = TRUE;
488 raw_data->data = calloc(1, data_in->buf_size+1);
489 if (NULL == raw_data->data) {
490 ERROR("calloc() Fail");
491 free(raw_data->call_id);
495 memcpy(raw_data->data, data_in->buf, data_in->buf_size);
496 raw_data->data_len = data_in->buf_size;
499 raw_data->is_data = FALSE;
500 raw_data->data_len = 0;
501 raw_data->data = NULL;
507 static int __send_raw_data(int fd, const char *client_id, pims_ipc_raw_data_s *data)
510 unsigned int client_id_len = strlen(client_id);
513 INFO("No data to send NULL\n");
517 unsigned int len = sizeof(unsigned int) // total size
518 + client_id_len + sizeof(unsigned int) // client_id
519 + sizeof(unsigned int) // seq_no
520 + data->call_id_len + sizeof(unsigned int) // call_id
521 + sizeof(unsigned int); // is data
522 unsigned int total_len = len;
525 len += sizeof(unsigned int); // data
526 total_len = len + data->data_len; // data
529 INFO("client_id: %s, call_id : %s, seq no :%d, len:%d, total len :%d", client_id, data->call_id, data->seq_no, len, total_len);
534 memset(buf, 0x0, len+1);
537 memcpy(buf, (void*)&total_len, sizeof(unsigned int));
538 length += sizeof(unsigned int);
541 memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
542 length += sizeof(unsigned int);
543 memcpy(buf+length, (void*)(client_id), client_id_len);
544 length += client_id_len;
547 memcpy(buf+length, (void*)&(data->seq_no), sizeof(unsigned int));
548 length += sizeof(unsigned int);
551 memcpy(buf+length, (void*)&(data->call_id_len), sizeof(unsigned int));
552 length += sizeof(unsigned int);
553 memcpy(buf+length, (void*)(data->call_id), data->call_id_len);
554 length += data->call_id_len;
557 memcpy(buf+length, (void*)&(data->is_data), sizeof(unsigned int));
558 length += sizeof(unsigned int);
561 memcpy(buf+length, (void*)&(data->data_len), sizeof(unsigned int));
562 length += sizeof(unsigned int);
563 ret = socket_send(fd, buf, length);
567 ret += socket_send_data(fd, data->data, data->data_len);
570 ret = socket_send(fd, buf, length);
575 static gboolean __worker_raw_data_pop(pims_ipc_worker_data_s *worker, pims_ipc_raw_data_s **data)
580 pthread_mutex_lock(&worker->queue_mutex);
581 if (!worker->queue) {
582 pthread_mutex_unlock(&worker->queue_mutex);
587 *data = g_list_first(worker->queue)->data;
588 worker->queue = g_list_delete_link(worker->queue, g_list_first(worker->queue));
589 pthread_mutex_unlock(&worker->queue_mutex);
594 static void* __worker_loop(void *data)
599 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
600 pims_ipc_worker_data_s *worker_data;
601 bool disconnected = false;
603 worker_fd = eventfd(0, 0);
606 worker_id = (int)pthread_self();
608 worker_data = calloc(1, sizeof(pims_ipc_worker_data_s));
609 if (NULL == worker_data) {
610 ERROR("calloc() Fail");
614 worker_data->fd = worker_fd;
615 worker_data->worker_id = worker_id;
616 worker_data->client_fd = -1;
617 worker_data->stop_thread = false;
618 pthread_mutex_init(&worker_data->queue_mutex, 0);
620 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
621 g_hash_table_insert(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd), worker_data);
622 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
624 pthread_mutex_lock(&ipc_svc->manager_queue_from_worker_mutex);
625 ipc_svc->manager_queue_from_worker = g_list_append(ipc_svc->manager_queue_from_worker, (void*)worker_fd);
626 pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
628 write_command(ipc_svc->manager, 1);
629 DEBUG("worker register to manager : worker_id(%08x00), worker_fd(%d)\n", worker_id, worker_fd);
631 struct pollfd *pollfds = (struct pollfd*)calloc(1, sizeof(struct pollfd));
632 if (NULL == pollfds) {
633 ERROR("calloc() Fail");
634 g_hash_table_remove(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd));
639 pollfds[0].fd = worker_fd;
640 pollfds[0].events = POLLIN;
642 while (!worker_data->stop_thread) {
644 if (worker_data->stop_thread)
646 ret = poll(pollfds, 1, 3000); // waiting command from router
647 if (ret == -1 && errno == EINTR) {
653 if (worker_data->stop_thread)
657 pims_ipc_raw_data_s *raw_data = NULL;
658 pims_ipc_raw_data_s *result = NULL;
660 if (pollfds[0].revents & POLLIN) {
662 read_command(pollfds[0].fd, &dummy);
663 if (__worker_raw_data_pop(worker_data, &raw_data)) {
664 pims_ipc_data_h data_in = NULL;
665 pims_ipc_data_h data_out = NULL;
666 if (strcmp(PIMS_IPC_CALL_ID_CREATE, raw_data->call_id) == 0) {
669 else if (strcmp(PIMS_IPC_CALL_ID_DESTROY, raw_data->call_id) == 0) {
673 data_in = pims_ipc_data_steal_unmarshal(raw_data->data, raw_data->data_len);
674 raw_data->data = NULL;
675 raw_data->data_len = 0;
676 raw_data->is_data = false;
677 __run_callback(worker_id, raw_data->call_id, data_in, &data_out);
678 pims_ipc_data_destroy(data_in);
682 __make_raw_data(raw_data->call_id, raw_data->seq_no, data_out, &result);
683 pims_ipc_data_destroy(data_out);
686 __make_raw_data(raw_data->call_id, raw_data->seq_no, NULL, &result);
688 if (worker_data->client_fd != -1)
689 __send_raw_data(worker_data->client_fd, raw_data->client_id, result);
690 __free_raw_data(raw_data);
691 __free_raw_data(result);
698 ERROR("client fd closed, worker_fd : %d", worker_fd);
699 INFO("task thread terminated --------------------------- (worker_fd : %d)", worker_fd);
701 pthread_mutex_lock(&ipc_svc->client_info_mutex);
702 g_hash_table_remove(ipc_svc->worker_client_info_map, GINT_TO_POINTER(worker_id));
703 pthread_mutex_unlock(&ipc_svc->client_info_mutex);
705 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
706 g_hash_table_remove(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd)); // __worker_data_free will be called
707 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
710 free ((void*)pollfds);
712 if (_client_disconnected_cb.callback)
713 _client_disconnected_cb.callback((pims_ipc_h)worker_id, _client_disconnected_cb.user_data);
718 static void __launch_thread(void *(*start_routine) (void *), void *data)
726 ret = pthread_attr_init(&attr);
728 ERROR("pthread_attr_init() Fail(%d)", ret);
731 ret = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
733 ERROR("pthread_attr_setscope() Fail(%d)", ret);
736 ret = pthread_create(&worker, &attr, start_routine, data);
738 ERROR("pthread_create() Fail(%d)", ret);
741 pthread_detach(worker);
744 static gboolean __is_worker_available()
746 if (_g_singleton->workers)
752 static int __get_worker(const char *client_id, int *worker_fd)
757 if (!__is_worker_available()) {
758 ERROR("There is no idle worker");
761 *worker_fd = (int)(g_list_first(_g_singleton->workers)->data);
762 _g_singleton->workers = g_list_delete_link(_g_singleton->workers,
763 g_list_first(_g_singleton->workers));
765 g_hash_table_insert(_g_singleton->client_worker_map, g_strdup(client_id), GINT_TO_POINTER(*worker_fd));
770 static int __find_worker(const char *client_id, int *worker_fd)
772 char *orig_pid = NULL;
778 if (FALSE == g_hash_table_lookup_extended(_g_singleton->client_worker_map, client_id,
779 (gpointer*)&orig_pid, (gpointer*)&fd)) {
780 VERBOSE("unable to find worker id for %s", client_id);
784 *worker_fd = GPOINTER_TO_INT(fd);
788 static bool __request_pop(pims_ipc_request_s *data_queue, pims_ipc_raw_data_s **data)
793 pthread_mutex_lock(&data_queue->raw_data_mutex);
794 cursor = g_list_first(data_queue->raw_data);
796 *data = cursor->data;
797 data_queue->raw_data = g_list_delete_link(data_queue->raw_data, cursor);
798 (data_queue->request_count)--;
805 pthread_mutex_unlock(&data_queue->raw_data_mutex);
809 static bool __worker_raw_data_push(pims_ipc_worker_data_s *worker_data, int client_fd, pims_ipc_raw_data_s *data)
811 pthread_mutex_lock(&worker_data->queue_mutex);
812 worker_data->queue = g_list_append(worker_data->queue, data);
813 worker_data->client_fd = client_fd;
814 pthread_mutex_unlock(&worker_data->queue_mutex);
819 static int _find_worker_id(pims_ipc_svc_s *ipc_svc, int worker_fd)
822 pims_ipc_worker_data_s *worker_data = NULL;
823 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
824 worker_data = g_hash_table_lookup(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd));
825 if (NULL == worker_data) {
826 ERROR("g_hash_table_lookup(%d) return NULL", worker_fd);
827 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
830 worker_id = worker_data->worker_id;
831 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
835 static int _create_client_info(int fd, pims_ipc_client_info_s **p_client_info)
839 char errmsg[1024] = {0};
841 pims_ipc_client_info_s *client_info = calloc(1, sizeof(pims_ipc_client_info_s));
842 if (NULL == client_info) {
843 ERROR("calloc() return NULL");
847 ret = cynara_creds_socket_get_client(fd, CLIENT_METHOD_SMACK, &(client_info->smack));
848 if (CYNARA_API_SUCCESS != ret) {
849 cynara_strerror(ret, errmsg, sizeof(errmsg));
850 ERROR("cynara_creds_socket_get_client() Fail(%d,%s)", ret, errmsg);
851 _destroy_client_info(client_info);
855 ret = cynara_creds_socket_get_user(fd, USER_METHOD_UID, &(client_info->uid));
856 if (CYNARA_API_SUCCESS != ret) {
857 cynara_strerror(ret, errmsg, sizeof(errmsg));
858 ERROR("cynara_creds_socket_get_user() Fail(%d,%s)", ret, errmsg);
859 _destroy_client_info(client_info);
863 ret = cynara_creds_socket_get_pid(fd, &pid);
864 if (CYNARA_API_SUCCESS != ret) {
865 cynara_strerror(ret, errmsg, sizeof(errmsg));
866 ERROR("cynara_creds_socket_get_pid() Fail(%d,%s)", ret, errmsg);
867 _destroy_client_info(client_info);
871 client_info->client_session = cynara_session_from_pid(pid);
872 if (NULL == client_info->client_session) {
873 ERROR("cynara_session_from_pid() return NULL");
874 _destroy_client_info(client_info);
877 *p_client_info = client_info;
882 static pims_ipc_client_info_s* _clone_client_info(pims_ipc_client_info_s *client_info)
884 if (NULL == client_info) {
885 ERROR("client_info is NULL");
889 pims_ipc_client_info_s *clone = calloc(1, sizeof(pims_ipc_client_info_s));
891 ERROR("calloc() Fail");
895 if (client_info->smack) {
896 clone->smack = strdup(client_info->smack);
897 if (NULL == clone->smack) {
898 ERROR("strdup() Fail");
899 _destroy_client_info(clone);
904 if (client_info->uid) {
905 clone->uid = strdup(client_info->uid);
906 if (NULL == clone->uid) {
907 ERROR("strdup() Fail");
908 _destroy_client_info(clone);
913 if (client_info->client_session) {
914 clone->client_session = strdup(client_info->client_session);
915 if (NULL == clone->client_session) {
916 ERROR("strdup() Fail");
917 _destroy_client_info(clone);
926 static int __process_router_event(pims_ipc_svc_s *ipc_svc, gboolean for_queue)
928 gboolean is_valid = FALSE;
929 pims_ipc_request_s *data_queue = NULL;
930 GList *queue_cursor = NULL;
932 char *client_id = NULL;
937 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
938 queue_cursor = g_list_first(ipc_svc->request_queue);
939 if (NULL == queue_cursor) {
940 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
943 client_id = (char *)(queue_cursor->data);
944 ASSERT(client_id != NULL);
945 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
947 ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd, (gpointer*)&data_queue);
950 ipc_svc->delay_count--;
952 if (ret == TRUE && data_queue) {
954 pims_ipc_worker_data_s *worker_data = NULL;
956 pthread_mutex_lock(&data_queue->raw_data_mutex);
957 GList *cursor = g_list_first(data_queue->raw_data);
959 pthread_mutex_unlock(&data_queue->raw_data_mutex);
963 pims_ipc_raw_data_s *data = (pims_ipc_raw_data_s*)(cursor->data);
965 ERROR("data is NULL");
966 pthread_mutex_unlock(&data_queue->raw_data_mutex);
969 char *call_id = data->call_id;
970 int client_fd = data_queue->client_fd;
972 ASSERT(call_id != NULL);
974 VERBOSE("call_id = [%s]", call_id);
975 if (strcmp(PIMS_IPC_CALL_ID_CREATE, call_id) == 0) {
976 // Get a worker. If cannot get a worker, create a worker and enqueue a current request
977 __launch_thread(__worker_loop, ipc_svc);
978 if (__get_worker((const char*)client_id, &worker_fd) != 0) {
979 ipc_svc->delay_count++;
980 pthread_mutex_unlock(&data_queue->raw_data_mutex);
984 int worker_id = _find_worker_id(ipc_svc, worker_fd);
985 pthread_mutex_lock(&ipc_svc->client_info_mutex);
986 pims_ipc_client_info_s *client_info = g_hash_table_lookup(ipc_svc->client_info_map, client_id);
987 pims_ipc_client_info_s *client_info_clone = _clone_client_info(client_info);
988 g_hash_table_insert(ipc_svc->worker_client_info_map, GINT_TO_POINTER(worker_id), client_info_clone);
989 pthread_mutex_unlock(&ipc_svc->client_info_mutex);
993 if (__find_worker((const char*)client_id, &worker_fd) != 0) {
994 ERROR("unable to find a worker");
995 pthread_mutex_unlock(&data_queue->raw_data_mutex);
999 pthread_mutex_unlock(&data_queue->raw_data_mutex);
1001 VERBOSE("routing client_id : %s, seq_no: %d, client_fd = %d, worker fd = %d", client_id, data->seq_no, client_fd, worker_fd);
1006 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
1007 if (FALSE == g_hash_table_lookup_extended(ipc_svc->task_fds,
1008 GINT_TO_POINTER(worker_fd), (gpointer*)&org_fd, (gpointer*)&worker_data)) {
1009 ERROR("hash lookup fail : worker_fd (%d)", worker_fd);
1010 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
1014 if (__request_pop(data_queue, &data)) {
1015 __worker_raw_data_push(worker_data, client_fd, data);
1016 write_command(worker_fd, 1);
1019 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
1022 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
1024 ipc_svc->request_queue = g_list_delete_link(ipc_svc->request_queue, queue_cursor);
1025 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
1030 if (is_valid == FALSE)
1036 static int __process_manager_event(pims_ipc_svc_s *ipc_svc)
1038 GList *cursor = NULL;
1041 // client socket terminated without disconnect request
1042 pthread_mutex_lock(&ipc_svc->manager_queue_from_epoll_mutex);
1043 if (ipc_svc->manager_queue_from_epoll) {
1044 cursor = g_list_first(ipc_svc->manager_queue_from_epoll);
1045 char *client_id = (char*)cursor->data;
1046 __find_worker(client_id, &worker_fd);
1048 ipc_svc->manager_queue_from_epoll = g_list_delete_link(ipc_svc->manager_queue_from_epoll, cursor);
1049 pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
1052 g_hash_table_remove(ipc_svc->client_worker_map, client_id);
1055 // stop worker thread
1058 pims_ipc_worker_data_s *worker_data;
1060 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
1061 if (FALSE == g_hash_table_lookup_extended(ipc_svc->task_fds,
1062 GINT_TO_POINTER(worker_fd), (gpointer*)&org_fd, (gpointer*)&worker_data)) {
1063 ERROR("g_hash_table_lookup_extended fail : worker_fd (%d)", worker_fd);
1064 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
1067 worker_data->stop_thread = true;
1068 worker_data->client_fd = -1;
1069 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
1071 write_command(worker_fd, 1);
1072 VERBOSE("write command to worker terminate (worker_fd : %d)", worker_fd);
1076 pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
1078 // create new worker
1079 pthread_mutex_lock(&ipc_svc->manager_queue_from_worker_mutex);
1080 if (ipc_svc->manager_queue_from_worker) {
1082 cursor = g_list_first(ipc_svc->manager_queue_from_worker);
1084 worker_fd = (int)cursor->data;
1085 ipc_svc->manager_queue_from_worker = g_list_delete_link(ipc_svc->manager_queue_from_worker, cursor);
1088 DEBUG("add idle worker_fd : %d", worker_fd);
1089 ipc_svc->workers = g_list_append(ipc_svc->workers, (void*)worker_fd);
1091 cursor = g_list_first(ipc_svc->manager_queue_from_worker);
1093 pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
1096 pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
1101 // if delete = true, steal client_id, then free(client_id)
1102 // if delete = false, return client_id pointer, then do no call free(client_id
1103 static int __find_client_id(pims_ipc_svc_s *ipc_svc, int client_fd, bool delete, char **client_id)
1105 pims_ipc_client_map_s *client;
1106 GList *cursor = NULL;
1107 cursor = g_list_first(ipc_svc->client_id_fd_map);
1109 client = cursor->data;
1110 if (client->fd == client_fd) {
1111 *client_id = client->id;
1114 ipc_svc->client_id_fd_map = g_list_delete_link(ipc_svc->client_id_fd_map, cursor); //free(client);
1119 cursor = g_list_next(cursor);
1124 static void __request_push(pims_ipc_svc_s *ipc_svc, char *client_id, int client_fd, pims_ipc_raw_data_s *data)
1128 pims_ipc_request_s *data_queue = NULL;
1130 ERROR("data is NULL");
1134 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
1135 ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd,(gpointer*)&data_queue);
1136 if (ret == TRUE && data_queue) {
1139 data_queue = calloc(1, sizeof(pims_ipc_request_s));
1140 if (NULL == data_queue) {
1141 ERROR("calloc() Fail");
1142 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
1145 data_queue->request_count = 0;
1146 pthread_mutex_init(&data_queue->raw_data_mutex, 0);
1148 g_hash_table_insert(ipc_svc->request_data_queue, g_strdup(client_id), data_queue);
1150 ipc_svc->request_queue = g_list_append(ipc_svc->request_queue, g_strdup(client_id));
1151 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
1153 pthread_mutex_lock(&data_queue->raw_data_mutex);
1154 data_queue->raw_data = g_list_append(data_queue->raw_data, data);
1155 data_queue->client_fd = client_fd;
1156 data_queue->request_count++;
1157 pthread_mutex_unlock(&data_queue->raw_data_mutex);
1160 static void __delete_request_queue(pims_ipc_svc_s *ipc_svc, char *client_id)
1162 pims_ipc_request_s *data_queue = NULL;
1168 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
1169 ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd,(gpointer*)&data_queue);
1171 g_hash_table_remove(ipc_svc->request_data_queue, (void*)client_id);
1173 cursor = g_list_first(ipc_svc->request_queue);
1177 cursor = g_list_next(cursor);
1178 if (id && strcmp(id, client_id) == 0) {
1180 ipc_svc->request_queue = g_list_delete_link(ipc_svc->request_queue, l);
1183 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
1186 pthread_mutex_lock(&data_queue->raw_data_mutex);
1187 cursor = g_list_first(data_queue->raw_data);
1188 pims_ipc_raw_data_s *data;
1191 data = (pims_ipc_raw_data_s *)cursor->data;
1192 cursor = g_list_next(cursor);
1193 data_queue->raw_data = g_list_delete_link(data_queue->raw_data, l);
1194 __free_raw_data(data);
1196 g_list_free(data_queue->raw_data);
1197 pthread_mutex_unlock(&data_queue->raw_data_mutex);
1198 pthread_mutex_destroy(&data_queue->raw_data_mutex);
1203 static int __send_identify(int fd, unsigned int seq_no, char *id, int id_len)
1205 int len = sizeof(unsigned int) // total size
1206 + id_len + sizeof(unsigned int) // id
1207 + sizeof(unsigned int); // seq_no
1212 memset(buf, 0x0, len+1);
1215 memcpy(buf, (void*)&len, sizeof(unsigned int));
1216 length += sizeof(unsigned int);
1219 memcpy(buf+length, (void*)&(id_len), sizeof(unsigned int));
1220 length += sizeof(unsigned int);
1221 memcpy(buf+length, (void*)(id), id_len);
1225 memcpy(buf+length, (void*)&(seq_no), sizeof(unsigned int));
1226 length += sizeof(unsigned int);
1228 return socket_send(fd, buf, length);
1231 static int __recv_raw_data(int fd, pims_ipc_raw_data_s **data, bool *identity)
1234 pims_ipc_raw_data_s *temp;
1236 /* read the size of message. note that ioctl is non-blocking */
1237 if (ioctl(fd, FIONREAD, &len)) {
1238 ERROR("ioctl failed: %d", errno);
1242 /* when server or client closed socket */
1244 INFO("[IPC Socket] connection is closed");
1248 temp = (pims_ipc_raw_data_s*)calloc(1, sizeof(pims_ipc_raw_data_s));
1250 ERROR("calloc() Fail");
1253 temp->client_id = NULL;
1254 temp->client_id_len = 0;
1255 temp->call_id = NULL;
1256 temp->call_id_len = 0;
1258 temp->is_data = FALSE;
1264 unsigned int total_len = 0;
1265 unsigned int is_data = FALSE;
1269 ret = TEMP_FAILURE_RETRY(read(fd, (void *)&total_len, sizeof(unsigned int)));
1270 if (ret < 0) { ERROR("read error"); break; }
1274 ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(temp->client_id_len), sizeof(unsigned int)));
1275 if (ret < 0) { ERROR("read error"); break; }
1278 temp->client_id = calloc(1, temp->client_id_len+1);
1279 if (NULL == temp->client_id) {
1280 ERROR("calloc() Fail");
1284 ret = socket_recv(fd, (void *)&(temp->client_id), temp->client_id_len);
1286 ERROR("socket_recv error(%d)", ret);
1292 ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(temp->seq_no), sizeof(unsigned int)));
1293 if (ret < 0) { ERROR("read error"); break; }
1296 if (total_len == read_len) {
1303 ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(temp->call_id_len), sizeof(unsigned int)));
1304 if (ret < 0) { ERROR("read error"); break; }
1307 temp->call_id = calloc(1, temp->call_id_len+1);
1308 if (NULL == temp->call_id) {
1309 ERROR("calloc() Fail");
1313 ret = socket_recv(fd, (void *)&(temp->call_id), temp->call_id_len);
1315 ERROR("socket_recv error(%d)", ret);
1321 ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(is_data), sizeof(unsigned int)));
1322 if (ret < 0) { ERROR("read error"); break; }
1327 temp->is_data = TRUE;
1328 ret = TEMP_FAILURE_RETRY(read(fd, (void *)&(temp->data_len), sizeof(unsigned int)));
1329 if (ret < 0) { ERROR("read error"); break; }
1332 temp->data = calloc(1, temp->data_len+1);
1333 if (NULL == temp->data) {
1334 ERROR("calloc() Fail");
1338 ret = socket_recv(fd, (void *)&(temp->data), temp->data_len);
1340 ERROR("socket_recv error(%d)", ret);
1346 INFO("client_id : %s, call_id : %s, seq_no : %d", temp->client_id, temp->call_id, temp->seq_no);
1353 ERROR("total_len(%d) client_id_len(%d)", total_len, temp->client_id_len);
1354 __free_raw_data(temp);
1363 static gboolean __request_handler(GIOChannel *src, GIOCondition condition, gpointer data)
1366 int event_fd = g_io_channel_unix_get_fd(src);
1367 char *client_id = NULL;
1368 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1369 if (NULL == ipc_svc) {
1370 ERROR("ipc_svc is NULL");
1374 if (G_IO_HUP & condition) {
1375 INFO("client closed ------------------------client_fd : %d", event_fd);
1380 __find_client_id(ipc_svc, event_fd, true, &client_id);
1382 // Send client_id to manager to terminate worker thread
1384 pthread_mutex_lock(&ipc_svc->client_info_mutex);
1385 g_hash_table_remove(ipc_svc->client_info_map, client_id);
1386 pthread_mutex_unlock(&ipc_svc->client_info_mutex);
1388 pthread_mutex_lock(&ipc_svc->manager_queue_from_epoll_mutex);
1389 ipc_svc->manager_queue_from_epoll = g_list_append(ipc_svc->manager_queue_from_epoll, (void*)g_strdup(client_id));
1390 pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
1391 write_command(ipc_svc->manager, 1);
1392 __delete_request_queue(ipc_svc, client_id);
1399 // receive data from client
1401 bool identity = false;
1402 pims_ipc_raw_data_s *req = NULL;
1404 recv_len = __recv_raw_data(event_fd, &req, &identity);
1406 // send command to router
1408 pims_ipc_client_map_s *client = (pims_ipc_client_map_s*)calloc(1, sizeof(pims_ipc_client_map_s));
1409 if (NULL == client) {
1410 ERROR("calloc() Fail");
1415 client->fd = event_fd;
1418 snprintf(temp, sizeof(temp), "%d_%s", ipc_svc->unique_sequence_number++, req->client_id);
1419 client->id = strdup(temp);
1420 free(req->client_id);
1421 req->client_id = NULL;
1422 ipc_svc->client_id_fd_map = g_list_append(ipc_svc->client_id_fd_map, client);
1424 // send server pid to client
1425 snprintf(temp, sizeof(temp), "%x", getpid());
1426 ret = __send_identify(event_fd, req->seq_no, temp, strlen(temp));
1428 __free_raw_data(req);
1430 ERROR("__send_identify() Fail(%d)", ret);
1435 pims_ipc_client_info_s *client_info = NULL;
1436 if (0 != _create_client_info(event_fd, &client_info))
1437 ERROR("_create_client_info() Fail");
1438 pthread_mutex_lock(&ipc_svc->client_info_mutex);
1439 g_hash_table_insert(ipc_svc->client_info_map, g_strdup(client->id), client_info);
1440 pthread_mutex_unlock(&ipc_svc->client_info_mutex);
1445 __find_client_id(ipc_svc, event_fd, false, &client_id);
1448 __request_push(ipc_svc, client_id, event_fd, req);
1449 write_command(ipc_svc->router, 1);
1452 ERROR("__find_client_id fail : event_fd (%d)", event_fd);
1455 ERROR("receive invalid : %d", event_fd);
1463 static gboolean __socket_handler(GIOChannel *src, GIOCondition condition, gpointer data)
1465 GIOChannel *channel;
1466 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1467 int client_sockfd = -1;
1468 int sockfd = ipc_svc->sockfd;
1469 struct sockaddr_un clientaddr;
1470 socklen_t client_len = sizeof(clientaddr);
1472 client_sockfd = accept(sockfd, (struct sockaddr *)&clientaddr, &client_len);
1473 if (-1 == client_sockfd) {
1474 char *errmsg = NULL;
1475 char buf[1024] = {0};
1476 errmsg = strerror_r(errno, buf, sizeof(buf));
1478 ERROR("accept error : %s", errmsg);
1482 channel = g_io_channel_unix_new(client_sockfd);
1483 g_io_add_watch(channel, G_IO_IN|G_IO_HUP, __request_handler, data);
1484 g_io_channel_unref(channel);
1489 static void* __main_loop(void *user_data)
1492 struct sockaddr_un addr;
1493 GIOChannel *gio = NULL;
1494 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)user_data;
1496 if (sd_listen_fds(1) == 1 && sd_is_socket_unix(SD_LISTEN_FDS_START, SOCK_STREAM, -1, ipc_svc->service, 0) > 0) {
1497 ipc_svc->sockfd = SD_LISTEN_FDS_START;
1500 unlink(ipc_svc->service);
1501 ipc_svc->sockfd = socket(PF_UNIX, SOCK_STREAM, 0);
1503 bzero(&addr, sizeof(addr));
1504 addr.sun_family = AF_UNIX;
1505 snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service);
1507 ret = bind(ipc_svc->sockfd, (struct sockaddr *)&addr, sizeof(addr));
1509 ERROR("bind error :%d", ret);
1510 ret = listen(ipc_svc->sockfd, 30);
1512 ret = chown(ipc_svc->service, getuid(), ipc_svc->group);
1513 ret = chmod(ipc_svc->service, ipc_svc->mode);
1516 gio = g_io_channel_unix_new(ipc_svc->sockfd);
1518 g_io_add_watch(gio, G_IO_IN, __socket_handler, (gpointer)ipc_svc);
1523 static int __open_router_fd(pims_ipc_svc_s *ipc_svc)
1530 // router inproc eventfd
1531 router = eventfd(0,0);
1533 ERROR("eventfd error : %d", errno);
1536 VERBOSE("router :%d\n", router);
1538 flags = fcntl(router, F_GETFL, 0);
1541 ret = fcntl (router, F_SETFL, flags | O_NONBLOCK);
1543 VERBOSE("rounter fcntl : %d\n", ret);
1545 // manager inproc eventfd
1546 manager = eventfd(0,0);
1547 if (-1 == manager) {
1548 ERROR("eventfd error : %d", errno);
1552 VERBOSE("manager :%d\n", manager);
1554 flags = fcntl(manager, F_GETFL, 0);
1557 ret = fcntl (manager, F_SETFL, flags | O_NONBLOCK);
1559 VERBOSE("manager fcntl : %d\n", ret);
1561 ipc_svc->router = router;
1562 ipc_svc->manager = manager;
1567 static void __close_router_fd(pims_ipc_svc_s *ipc_svc)
1569 close(ipc_svc->router);
1570 close(ipc_svc->manager);
1573 static void* __publish_loop(void *user_data)
1578 struct sockaddr_un addr;
1579 struct epoll_event ev = {0};
1580 pims_ipc_svc_for_publish_s *ipc_svc = (pims_ipc_svc_for_publish_s*)user_data;
1582 unlink(ipc_svc->service);
1583 ipc_svc->publish_sockfd = socket(PF_UNIX, SOCK_STREAM, 0);
1585 bzero(&addr, sizeof(struct sockaddr_un));
1586 addr.sun_family = AF_UNIX;
1587 snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service);
1589 int flags = fcntl (ipc_svc->publish_sockfd, F_GETFL, 0);
1592 ret = fcntl (ipc_svc->publish_sockfd, F_SETFL, flags | O_NONBLOCK);
1593 VERBOSE("publish socketfd fcntl : %d\n", ret);
1595 ret = bind(ipc_svc->publish_sockfd, (struct sockaddr *)&(addr), sizeof(struct sockaddr_un));
1597 ERROR("bind error :%d", ret);
1598 ret = listen(ipc_svc->publish_sockfd, 30);
1599 WARN_IF(ret != 0, "listen error :%d", ret);
1601 ret = chown(ipc_svc->service, getuid(), ipc_svc->group);
1602 WARN_IF(ret != 0, "chown error :%d", ret);
1603 ret = chmod(ipc_svc->service, ipc_svc->mode);
1604 WARN_IF(ret != 0, "chmod error :%d", ret);
1606 epfd = epoll_create(MAX_EPOLL_EVENT);
1608 ev.events = EPOLLIN | EPOLLHUP;
1609 ev.data.fd = ipc_svc->publish_sockfd;
1611 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ipc_svc->publish_sockfd, &ev);
1612 WARN_IF(ret != 0, "listen error :%d", ret);
1614 while (!ipc_svc->epoll_stop_thread) {
1616 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
1617 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, -1);
1619 if (ipc_svc->epoll_stop_thread)
1622 if (event_num == -1) {
1623 if (errno != EINTR) {
1624 ERROR("errno:%d\n", errno);
1629 for (i = 0; i < event_num; i++) {
1630 int event_fd = events[i].data.fd;
1632 if (events[i].events & EPOLLHUP) {
1633 VERBOSE("client closed -----------------------------------------:%d", event_fd);
1634 if (epoll_ctl(epfd, EPOLL_CTL_DEL, event_fd, events) == -1) {
1635 ERROR("epoll_ctl (EPOLL_CTL_DEL) fail : errno(%d)", errno);
1639 // Find client_id and delete
1640 GList *cursor = NULL;
1642 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
1643 cursor = g_list_first(ipc_svc->subscribe_fds);
1645 if (event_fd == (int)cursor->data) {
1646 ipc_svc->subscribe_fds = g_list_delete_link(ipc_svc->subscribe_fds, cursor);
1649 cursor = g_list_next(cursor);
1651 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
1654 else if (event_fd == ipc_svc->publish_sockfd) { // connect client
1655 struct sockaddr_un remote;
1656 int remote_len = sizeof(remote);
1657 int client_fd = accept(ipc_svc->publish_sockfd, (struct sockaddr *)&remote, (socklen_t*) &remote_len);
1658 if (client_fd == -1) {
1659 ERROR("accept fail : errno : %d", errno);
1662 VERBOSE("client subscriber connect: %d", client_fd);
1664 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
1665 ipc_svc->subscribe_fds = g_list_append(ipc_svc->subscribe_fds, (void*)client_fd);
1666 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
1668 ev.events = EPOLLIN;
1669 ev.data.fd = client_fd;
1670 if (epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
1671 ERROR("epoll_ctl (EPOLL_CTL_ADD) fail : error(%d)\n", errno);
1678 close(ipc_svc->publish_sockfd);
1684 static void __stop_for_publish(pims_ipc_svc_for_publish_s *ipc_svc)
1686 ipc_svc->epoll_stop_thread = true;
1689 static void* __router_loop(void *data)
1691 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1693 struct pollfd *pollfds;
1695 pollfds = (struct pollfd*)calloc(fd_count, sizeof(struct pollfd));
1696 if (NULL == pollfds) {
1697 ERROR("calloc() Fail");
1700 pollfds[0].fd = ipc_svc->router;
1701 pollfds[0].events = POLLIN;
1702 pollfds[1].fd = ipc_svc->manager;
1703 pollfds[1].events = POLLIN;
1708 int check_router_queue = -1;
1709 int check_manager_queue = -1;
1712 ret = poll(pollfds, fd_count, 1000);
1713 if (ret == -1 && errno == EINTR) {
1715 continue; //return NULL;
1721 if (pollfds[0].revents & POLLIN) {
1722 // request router: send request to worker
1723 if (sizeof (dummy) == read_command(pollfds[0].fd, &dummy)) {
1724 check_router_queue = __process_router_event(ipc_svc, false);
1728 if (pollfds[1].revents & POLLIN) {
1730 if (sizeof (dummy) == read_command(pollfds[1].fd, &dummy)) {
1731 check_manager_queue = __process_manager_event(ipc_svc);
1732 if (ipc_svc->delay_count > 0)
1733 check_router_queue = __process_router_event(ipc_svc, true);
1739 while(check_router_queue > 0 || check_manager_queue > 0) {
1740 read_command(pollfds[0].fd, &dummy);
1741 check_router_queue = __process_router_event(ipc_svc, false);
1743 read_command(pollfds[1].fd, &dummy);
1744 check_manager_queue = __process_manager_event(ipc_svc);
1745 if (ipc_svc->delay_count > 0)
1746 check_router_queue = __process_router_event(ipc_svc, true);
1755 API void pims_ipc_svc_run_main_loop(GMainLoop* loop)
1758 GMainLoop* main_loop = loop;
1760 if (main_loop == NULL) {
1761 main_loop = g_main_loop_new(NULL, FALSE);
1764 if (_g_singleton_for_publish)
1765 __launch_thread(__publish_loop, _g_singleton_for_publish);
1768 ret = __open_router_fd(_g_singleton);
1772 // launch worker threads in advance
1773 for (i = 0; i < _g_singleton->workers_max_count; i++)
1774 __launch_thread(__worker_loop, _g_singleton);
1776 __launch_thread(__router_loop, _g_singleton);
1777 __main_loop(_g_singleton);
1780 g_main_loop_run(main_loop);
1783 __close_router_fd(_g_singleton);
1785 if (_g_singleton_for_publish)
1786 __stop_for_publish(_g_singleton_for_publish);
1790 API void pims_ipc_svc_set_client_disconnected_cb(pims_ipc_svc_client_disconnected_cb callback, void *user_data)
1792 if (_client_disconnected_cb.callback) {
1793 ERROR("already registered");
1796 _client_disconnected_cb.callback = callback;
1797 _client_disconnected_cb.user_data = user_data;
1800 API bool pims_ipc_svc_check_privilege(pims_ipc_h ipc, char *privilege)
1803 int worker_id = (int)ipc;
1804 pims_ipc_client_info_s *client_info = NULL;
1805 pims_ipc_client_info_s *client_info_clone = NULL;
1807 if (NULL == privilege) {
1808 ERROR("privilege is NULL");
1812 pthread_mutex_lock(&_g_singleton->client_info_mutex);
1813 client_info = g_hash_table_lookup(_g_singleton->worker_client_info_map, GINT_TO_POINTER(worker_id));
1814 if (NULL == client_info) {
1815 ERROR("client_info is NULL");
1816 pthread_mutex_unlock(&_g_singleton->client_info_mutex);
1819 client_info_clone = _clone_client_info(client_info);
1820 pthread_mutex_unlock(&_g_singleton->client_info_mutex);
1822 if (NULL == client_info_clone) {
1823 ERROR("client_info_clone is NULL");
1827 pthread_mutex_lock(&_g_singleton->cynara_mutex);
1828 ret = cynara_check(_g_singleton->cynara, client_info_clone->smack, client_info_clone->client_session, client_info_clone->uid, privilege);
1829 pthread_mutex_unlock(&_g_singleton->cynara_mutex);
1831 _destroy_client_info(client_info_clone);
1833 if (CYNARA_API_ACCESS_ALLOWED == ret)
1839 API int pims_ipc_svc_get_smack_label(pims_ipc_h ipc, char **p_smack)
1841 pims_ipc_client_info_s *client_info = NULL;
1842 int worker_id = (int)ipc;
1844 pthread_mutex_lock(&_g_singleton->client_info_mutex);
1845 client_info = g_hash_table_lookup(_g_singleton->worker_client_info_map, GINT_TO_POINTER(worker_id));
1846 if (NULL == client_info) {
1847 ERROR("g_hash_table_lookup(%d) return NULL", worker_id);
1848 pthread_mutex_unlock(&_g_singleton->client_info_mutex);
1852 if (client_info->smack) {
1853 *p_smack = strdup(client_info->smack);
1854 if (NULL == *p_smack) {
1855 ERROR("strdup() return NULL");
1856 pthread_mutex_unlock(&_g_singleton->client_info_mutex);
1860 pthread_mutex_unlock(&_g_singleton->client_info_mutex);