4 * Copyright (c) 2012 - 2013 Samsung Electronics Co., Ltd. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the License);
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
27 #include <poll.h> // pollfds
28 #include <fcntl.h> //fcntl
30 #include <systemd/sd-daemon.h>
34 #include <sys/un.h> // sockaddr_un
35 #include <sys/ioctl.h> // ioctl
36 #include <sys/epoll.h> // epoll
37 #include <sys/eventfd.h> // eventfd
38 #include <sys/socket.h> //socket
39 #include <sys/types.h>
41 #include "pims-internal.h"
42 #include "pims-debug.h"
43 #include "pims-socket.h"
44 #include "pims-ipc-data.h"
45 #include "pims-ipc-data-internal.h"
46 #include "pims-ipc-svc.h"
48 #define PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT 2
56 GHashTable *cb_table; // call_id, cb_data
58 // Global socket info and epoll thread
60 bool epoll_stop_thread;
62 /////////////////////////////////////////////
63 // router inproc eventfd
65 int delay_count; // not need mutex
66 // epoll thread add client_fd, when receive, router read requests
67 GList *request_queue; // client_id lists to send request
68 pthread_mutex_t request_data_queue_mutex;
69 GHashTable *request_data_queue; // key : client id, data : GList pims_ipc_raw_data_s (client_fd, seq_no, request(command), additional data...)
70 // router add client when receive connecting request, remove client when disconneting request in router thread
71 // manager remove client when terminating client without disconnect request in router thread
72 GHashTable *client_worker_map; // key : client_id, worker_fd, not need mutex
73 GList *client_id_fd_map; // pims_ipc_client_map_s
74 //key :client_id(pid:seq_no), data : client_fd
76 /////////////////////////////////////////////
77 pthread_mutex_t task_fds_mutex;
78 // when starting worker thread, register fd
79 // when endting worker thread, deregister fd
80 GHashTable *task_fds; // worker_fd - worker data (worker fd, client_fd, request queue(GList), stop_thread)
81 int workers_max_count;
83 /////////////////////////////////////////////
84 // manager inproc eventfd
86 // write by new worker thread, read by manager in router thread, need mutex
87 pthread_mutex_t manager_queue_from_worker_mutex;
88 GList *manager_queue_from_worker; // worker_fd => add to workers
89 // write in epoll thread(for dead client), read by manager in router thread, need mutex
90 pthread_mutex_t manager_queue_from_epoll_mutex;
91 GList *manager_queue_from_epoll; // cliend_fd => find worker_fd => add to idle workers
92 // managed by manager, router find idle worker when connecting new client in router thread => remove from idle workers
93 GList *workers; // worker_fd list, not need mutex
94 /////////////////////////////////////////////
103 bool epoll_stop_thread;
104 pthread_mutex_t subscribe_fds_mutex;
105 GList *subscribe_fds; // cliend fd list
106 } pims_ipc_svc_for_publish_s;
111 }pims_ipc_client_map_s;
114 pims_ipc_svc_call_cb callback;
119 pims_ipc_svc_client_disconnected_cb callback;
121 } pims_ipc_svc_client_disconnected_cb_t;
125 int worker_id; // pthrad_self()
128 GList *queue; // pims_ipc_raw_data_s list
129 pthread_mutex_t queue_mutex;
130 } pims_ipc_worker_data_s;
134 unsigned int client_id_len;
137 unsigned int call_id_len;
138 unsigned int is_data;
139 unsigned int data_len;
141 }pims_ipc_raw_data_s;
146 GList *raw_data; // pims_ipc_raw_data_s list
147 pthread_mutex_t raw_data_mutex;
150 static pims_ipc_svc_s *_g_singleton = NULL;
151 static pims_ipc_svc_for_publish_s *_g_singleton_for_publish = NULL;
153 static __thread pims_ipc_svc_client_disconnected_cb_t _client_disconnected_cb = {NULL, NULL};
155 static void __free_raw_data(pims_ipc_raw_data_s *data)
159 free(data->client_id);
165 static void __worker_data_free(gpointer data)
167 pims_ipc_worker_data_s *worker_data = (pims_ipc_worker_data_s*)data;
169 pthread_mutex_lock(&worker_data->queue_mutex);
170 if (worker_data->queue) {
171 GList *cursor = g_list_first(worker_data->queue);
174 pims_ipc_raw_data_s *data = l->data;
175 cursor = g_list_next(cursor);
176 worker_data->queue = g_list_remove_link(worker_data->queue, l);
178 __free_raw_data(data);
181 pthread_mutex_unlock(&worker_data->queue_mutex);
185 API int pims_ipc_svc_init(char *service, gid_t group, mode_t mode)
188 ERROR("Already exist");
192 _g_singleton = g_new0(pims_ipc_svc_s, 1);
193 _g_singleton->service = g_strdup(service);
194 _g_singleton->group = group;
195 _g_singleton->mode = mode;
196 _g_singleton->workers_max_count = PIMS_IPC_WORKERS_DEFAULT_MAX_COUNT;
197 _g_singleton->cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
198 ASSERT(_g_singleton->cb_table);
200 pthread_mutex_init(&_g_singleton->request_data_queue_mutex, 0);
201 _g_singleton->request_queue = NULL;
202 _g_singleton->request_data_queue = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); // client_id - pims_ipc_raw_data_s
203 ASSERT(_g_singleton->request_data_queue);
204 _g_singleton->client_worker_map = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, NULL); // client id - worker_fd mapping
205 ASSERT(_g_singleton->client_worker_map);
206 _g_singleton->delay_count = 0;
208 pthread_mutex_init(&_g_singleton->task_fds_mutex, 0);
209 _g_singleton->task_fds = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __worker_data_free); // pims_ipc_worker_data_s
210 ASSERT(_g_singleton->task_fds);
212 pthread_mutex_init(&_g_singleton->manager_queue_from_epoll_mutex, 0);
213 _g_singleton->manager_queue_from_epoll = NULL;
215 pthread_mutex_init(&_g_singleton->manager_queue_from_worker_mutex, 0);
216 _g_singleton->manager_queue_from_worker = NULL;
217 _g_singleton->workers = NULL;
219 _g_singleton->epoll_stop_thread = false;
224 API int pims_ipc_svc_deinit(void)
229 g_free(_g_singleton->service);
230 g_hash_table_destroy(_g_singleton->cb_table);
232 pthread_mutex_destroy(&_g_singleton->request_data_queue_mutex);
233 g_hash_table_destroy(_g_singleton->client_worker_map);
234 g_hash_table_destroy(_g_singleton->request_data_queue);
235 g_list_free_full(_g_singleton->request_queue, g_free);
237 pthread_mutex_destroy(&_g_singleton->task_fds_mutex);
238 g_hash_table_destroy(_g_singleton->task_fds);
240 pthread_mutex_destroy(&_g_singleton->manager_queue_from_epoll_mutex);
241 g_list_free_full(_g_singleton->manager_queue_from_epoll, g_free);
242 pthread_mutex_destroy(&_g_singleton->manager_queue_from_worker_mutex);
243 g_list_free(_g_singleton->manager_queue_from_worker);
245 GList *cursor = g_list_first(_g_singleton->client_id_fd_map);
247 pims_ipc_client_map_s *client = cursor->data;
248 _g_singleton->client_id_fd_map = g_list_remove_link(_g_singleton->client_id_fd_map, cursor); //free(client_id);
252 cursor = g_list_first(_g_singleton->client_id_fd_map);
254 g_list_free(_g_singleton->client_id_fd_map);
256 g_list_free(_g_singleton->workers);
257 g_free(_g_singleton);
263 API int pims_ipc_svc_register(char *module, char *function, pims_ipc_svc_call_cb callback, void *userdata)
265 pims_ipc_svc_cb_s *cb_data = NULL;
266 gchar *call_id = NULL;
268 if (!module || !function || !callback) {
269 ERROR("Invalid argument");
272 cb_data = g_new0(pims_ipc_svc_cb_s, 1);
273 call_id = PIMS_IPC_MAKE_CALL_ID(module, function);
275 VERBOSE("register cb id[%s]", call_id);
276 cb_data->callback = callback;
277 cb_data->user_data = userdata;
278 g_hash_table_insert(_g_singleton->cb_table, call_id, cb_data);
283 API int pims_ipc_svc_init_for_publish(char *service, gid_t group, mode_t mode)
285 if (_g_singleton_for_publish) {
286 ERROR("Already exist");
290 _g_singleton_for_publish = g_new0(pims_ipc_svc_for_publish_s, 1);
291 _g_singleton_for_publish->service = g_strdup(service);
292 _g_singleton_for_publish->group = group;
293 _g_singleton_for_publish->mode = mode;
294 _g_singleton_for_publish->subscribe_fds = NULL;
296 pthread_mutex_init(&_g_singleton_for_publish->subscribe_fds_mutex, 0);
301 API int pims_ipc_svc_deinit_for_publish(void)
303 if (!_g_singleton_for_publish)
306 pthread_mutex_destroy(&_g_singleton_for_publish->subscribe_fds_mutex);
307 g_list_free(_g_singleton_for_publish->subscribe_fds);
309 g_free(_g_singleton_for_publish->service);
310 g_free(_g_singleton_for_publish);
311 _g_singleton_for_publish = NULL;
316 API int pims_ipc_svc_publish(char *module, char *event, pims_ipc_data_h data)
318 pims_ipc_svc_for_publish_s *ipc_svc = _g_singleton_for_publish;
319 gboolean is_valid = FALSE;
320 gchar *call_id = PIMS_IPC_MAKE_CALL_ID(module, event);
321 pims_ipc_data_s *data_in = (pims_ipc_data_s*)data;
322 unsigned int call_id_len = strlen(call_id);
323 unsigned int is_data = FALSE;
327 unsigned int len = sizeof(unsigned int) // total size
328 + call_id_len + sizeof(unsigned int) // call_id
329 + sizeof(unsigned int); // is data
330 unsigned int total_len = len;
334 len += sizeof(unsigned int);
335 total_len = len + data_in->buf_size; // data
340 memset(buf, 0x0, len+1);
343 memcpy(buf, (void*)&total_len, sizeof(unsigned int));
344 length += sizeof(unsigned int);
347 memcpy(buf+length, (void*)&(call_id_len), sizeof(unsigned int));
348 length += sizeof(unsigned int);
349 memcpy(buf+length, (void*)(call_id), call_id_len);
350 length += call_id_len;
354 memcpy(buf+length, (void*)&(is_data), sizeof(unsigned int));
355 length += sizeof(unsigned int);
359 memcpy(buf+length, (void*)&(data_in->buf_size), sizeof(unsigned int));
360 length += sizeof(unsigned int);
363 // Publish to clients
364 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
365 GList *cursor = g_list_first(ipc_svc->subscribe_fds);
368 int fd = (int)cursor->data;
369 ret = socket_send(fd, buf, length);
371 ERROR("socket_send publish error : %d", ret);
375 ret = socket_send_data(fd, data_in->buf, data_in->buf_size);
377 ERROR("socket_send_data publish error : %d", ret);
380 cursor = g_list_next(cursor);
382 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
387 if (is_valid == FALSE)
392 static void __run_callback(int worker_id, char *call_id, pims_ipc_data_h dhandle_in, pims_ipc_data_h *dhandle_out)
394 pims_ipc_svc_cb_s *cb_data = NULL;
396 VERBOSE("Call id [%s]", call_id);
398 cb_data = (pims_ipc_svc_cb_s*)g_hash_table_lookup(_g_singleton->cb_table, call_id);
399 if (cb_data == NULL) {
400 VERBOSE("unable to find %s", call_id);
404 cb_data->callback((pims_ipc_h)worker_id, dhandle_in, dhandle_out, cb_data->user_data);
407 static void __make_raw_data(const char *call_id, int seq_no, pims_ipc_data_h data, pims_ipc_raw_data_s**out)
409 pims_ipc_raw_data_s *raw_data = NULL;
410 raw_data = (pims_ipc_raw_data_s*)calloc(1, sizeof(pims_ipc_raw_data_s));
411 pims_ipc_data_s *data_in = (pims_ipc_data_s*)data;
413 raw_data->call_id = strdup(call_id);
414 raw_data->call_id_len = strlen(raw_data->call_id);
415 raw_data->seq_no = seq_no;
417 if (data_in && data_in->buf_size > 0) {
418 raw_data->is_data = TRUE;
419 raw_data->data = calloc(1, data_in->buf_size+1);
420 memcpy(raw_data->data, data_in->buf, data_in->buf_size);
421 raw_data->data_len = data_in->buf_size;
424 raw_data->is_data = FALSE;
425 raw_data->data_len = 0;
426 raw_data->data = NULL;
432 static int __send_raw_data(int fd, const char *client_id, pims_ipc_raw_data_s *data)
435 unsigned int client_id_len = strlen(client_id);
438 INFO("No data to send NULL\n");
442 unsigned int len = sizeof(unsigned int) // total size
443 + client_id_len + sizeof(unsigned int) // client_id
444 + sizeof(unsigned int) // seq_no
445 + data->call_id_len + sizeof(unsigned int) // call_id
446 + sizeof(unsigned int); // is data
447 unsigned int total_len = len;
450 len += sizeof(unsigned int); // data
451 total_len = len + data->data_len; // data
454 INFO("client_id: %s, call_id : %s, seq no :%d, len:%d, total len :%d", client_id, data->call_id, data->seq_no, len, total_len);
459 memset(buf, 0x0, len+1);
462 memcpy(buf, (void*)&total_len, sizeof(unsigned int));
463 length += sizeof(unsigned int);
466 memcpy(buf+length, (void*)&(client_id_len), sizeof(unsigned int));
467 length += sizeof(unsigned int);
468 memcpy(buf+length, (void*)(client_id), client_id_len);
469 length += client_id_len;
472 memcpy(buf+length, (void*)&(data->seq_no), sizeof(unsigned int));
473 length += sizeof(unsigned int);
476 memcpy(buf+length, (void*)&(data->call_id_len), sizeof(unsigned int));
477 length += sizeof(unsigned int);
478 memcpy(buf+length, (void*)(data->call_id), data->call_id_len);
479 length += data->call_id_len;
482 memcpy(buf+length, (void*)&(data->is_data), sizeof(unsigned int));
483 length += sizeof(unsigned int);
486 memcpy(buf+length, (void*)&(data->data_len), sizeof(unsigned int));
487 length += sizeof(unsigned int);
488 ret = socket_send(fd, buf, length);
492 ret += socket_send_data(fd, data->data, data->data_len);
495 ret = socket_send(fd, buf, length);
500 static gboolean __worker_raw_data_pop(pims_ipc_worker_data_s *worker, pims_ipc_raw_data_s **data)
505 pthread_mutex_lock(&worker->queue_mutex);
506 if (!worker->queue) {
507 pthread_mutex_unlock(&worker->queue_mutex);
512 *data = g_list_first(worker->queue)->data;
513 worker->queue = g_list_delete_link(worker->queue, g_list_first(worker->queue));
514 pthread_mutex_unlock(&worker->queue_mutex);
519 static void* __worker_loop(void *data)
524 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
525 pims_ipc_worker_data_s *worker_data;
526 bool disconnected = false;
528 worker_fd = eventfd(0, 0);
531 worker_id = (int)pthread_self();
533 worker_data = calloc(1, sizeof(pims_ipc_worker_data_s));
534 worker_data->fd = worker_fd;
535 worker_data->worker_id = worker_id;
536 worker_data->client_fd = -1;
537 worker_data->stop_thread = false;
538 pthread_mutex_init(&worker_data->queue_mutex, 0);
540 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
541 g_hash_table_insert(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd), worker_data);
542 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
544 pthread_mutex_lock(&ipc_svc->manager_queue_from_worker_mutex);
545 ipc_svc->manager_queue_from_worker = g_list_append(ipc_svc->manager_queue_from_worker, (void*)worker_fd);
546 pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
548 write_command(ipc_svc->manager, 1);
549 DEBUG("worker register to manager : worker_id(%08x00), worker_fd(%d)\n", worker_id, worker_fd);
551 struct pollfd *pollfds = (struct pollfd*) malloc (1 * sizeof (struct pollfd));
552 pollfds[0].fd = worker_fd;
553 pollfds[0].events = POLLIN;
555 while (!worker_data->stop_thread) {
557 if (worker_data->stop_thread)
559 ret = poll(pollfds, 1, 3000); // waiting command from router
560 if (ret == -1 && errno == EINTR) {
566 if (worker_data->stop_thread)
570 pims_ipc_raw_data_s *raw_data = NULL;
571 pims_ipc_raw_data_s *result = NULL;
573 if (pollfds[0].revents & POLLIN) {
575 read_command(pollfds[0].fd, &dummy);
576 if (__worker_raw_data_pop(worker_data, &raw_data)) {
577 pims_ipc_data_h data_in = NULL;
578 pims_ipc_data_h data_out = NULL;
580 if (strcmp(PIMS_IPC_CALL_ID_CREATE, raw_data->call_id) == 0) {
583 else if (strcmp(PIMS_IPC_CALL_ID_DESTROY, raw_data->call_id) == 0) {
587 data_in = pims_ipc_data_steal_unmarshal(raw_data->data, raw_data->data_len);
588 raw_data->data = NULL;
589 raw_data->data_len = 0;
590 raw_data->is_data = false;
591 __run_callback(worker_id, raw_data->call_id, data_in, &data_out);
592 pims_ipc_data_destroy(data_in);
596 __make_raw_data(raw_data->call_id, raw_data->seq_no, data_out, &result);
597 pims_ipc_data_destroy(data_out);
600 __make_raw_data(raw_data->call_id, raw_data->seq_no, NULL, &result);
602 if (worker_data->client_fd != -1)
603 __send_raw_data(worker_data->client_fd, raw_data->client_id, result);
604 __free_raw_data(raw_data);
605 __free_raw_data(result);
612 ERROR("client fd closed, worker_fd : %d", worker_fd);
613 INFO("task thread terminated --------------------------- (worker_fd : %d)", worker_fd);
615 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
616 g_hash_table_remove(ipc_svc->task_fds, GINT_TO_POINTER(worker_fd)); // __worker_data_free will be called
617 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
620 free ((void*)pollfds);
622 if (_client_disconnected_cb.callback)
623 _client_disconnected_cb.callback((pims_ipc_h)worker_id, _client_disconnected_cb.user_data);
628 static void __launch_thread(void *(*start_routine) (void *), void *data)
634 pthread_attr_init(&attr);
635 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
637 pthread_create(&worker, &attr, start_routine, data);
638 // detach this thread
639 pthread_detach(worker);
642 static gboolean __is_worker_available()
644 if (_g_singleton->workers)
650 static int __get_worker(const char *client_id, int *worker_id)
655 if (!__is_worker_available()) {
656 ERROR("There is no idle worker");
659 *worker_id = (int)(g_list_first(_g_singleton->workers)->data);
660 _g_singleton->workers = g_list_delete_link(_g_singleton->workers,
661 g_list_first(_g_singleton->workers));
663 g_hash_table_insert(_g_singleton->client_worker_map, g_strdup(client_id), GINT_TO_POINTER(*worker_id));
668 static int __find_worker(const char *client_id, int *worker_fd)
670 char *orig_pid = NULL;
676 if (FALSE == g_hash_table_lookup_extended(_g_singleton->client_worker_map, client_id,
677 (gpointer*)&orig_pid, (gpointer*)&fd)) {
678 VERBOSE("unable to find worker id for %s", client_id);
682 *worker_fd = GPOINTER_TO_INT(fd);
686 static bool __request_pop(pims_ipc_request_s *data_queue, pims_ipc_raw_data_s **data)
691 pthread_mutex_lock(&data_queue->raw_data_mutex);
692 cursor = g_list_first(data_queue->raw_data);
694 *data = cursor->data;
695 data_queue->raw_data = g_list_delete_link(data_queue->raw_data, cursor);
696 (data_queue->request_count)--;
703 pthread_mutex_unlock(&data_queue->raw_data_mutex);
707 static bool __worker_raw_data_push(pims_ipc_worker_data_s *worker_data, int client_fd, pims_ipc_raw_data_s *data)
709 pthread_mutex_lock(&worker_data->queue_mutex);
710 worker_data->queue = g_list_append(worker_data->queue, data);
711 worker_data->client_fd = client_fd;
712 pthread_mutex_unlock(&worker_data->queue_mutex);
717 static int __process_router_event(pims_ipc_svc_s *ipc_svc, gboolean for_queue)
719 gboolean is_valid = FALSE;
720 pims_ipc_request_s *data_queue = NULL;
721 GList *queue_cursor = NULL;
723 char *client_id = NULL;
728 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
729 queue_cursor = g_list_first(ipc_svc->request_queue);
730 if (NULL == queue_cursor) {
731 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
734 client_id = (char *)(queue_cursor->data);
735 ASSERT(client_id != NULL);
736 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
738 ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd, (gpointer*)&data_queue);
741 ipc_svc->delay_count--;
743 if (ret == TRUE && data_queue) {
745 pims_ipc_worker_data_s *worker_data = NULL;
747 pthread_mutex_lock(&data_queue->raw_data_mutex);
748 GList *cursor = g_list_first(data_queue->raw_data);
750 pthread_mutex_unlock(&data_queue->raw_data_mutex);
754 pims_ipc_raw_data_s *data = (pims_ipc_raw_data_s*)(cursor->data);
755 char *call_id = data->call_id;
756 int client_fd = data_queue->client_fd;
758 ASSERT(call_id != NULL);
760 VERBOSE("call_id = [%s]", call_id);
761 if (strcmp(PIMS_IPC_CALL_ID_CREATE, call_id) == 0) {
762 // Get a worker. If cannot get a worker, create a worker and enqueue a current request
763 __launch_thread(__worker_loop, ipc_svc);
764 if (__get_worker((const char*)client_id, &worker_fd) != 0) {
765 ipc_svc->delay_count++;
766 pthread_mutex_unlock(&data_queue->raw_data_mutex);
773 if (__find_worker((const char*)client_id, &worker_fd) != 0) {
774 ERROR("unable to find a worker");
775 pthread_mutex_unlock(&data_queue->raw_data_mutex);
779 pthread_mutex_unlock(&data_queue->raw_data_mutex);
781 VERBOSE("routing client_id : %s, seq_no: %d, client_fd = %d, worker fd = %d", client_id, data->seq_no, client_fd, worker_fd);
786 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
787 if (FALSE == g_hash_table_lookup_extended(ipc_svc->task_fds,
788 GINT_TO_POINTER(worker_fd), (gpointer*)&org_fd, (gpointer*)&worker_data)) {
789 ERROR("hash lookup fail : worker_fd (%d)", worker_fd);
790 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
794 if (__request_pop(data_queue, &data)) {
795 __worker_raw_data_push(worker_data, client_fd, data);
796 write_command(worker_fd, 1);
799 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
802 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
804 ipc_svc->request_queue = g_list_delete_link(ipc_svc->request_queue, queue_cursor);
805 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
810 if (is_valid == FALSE)
816 static int __process_manager_event(pims_ipc_svc_s *ipc_svc)
818 GList *cursor = NULL;
821 // client socket terminated without disconnect request
822 pthread_mutex_lock(&ipc_svc->manager_queue_from_epoll_mutex);
823 if (ipc_svc->manager_queue_from_epoll) {
824 cursor = g_list_first(ipc_svc->manager_queue_from_epoll);
825 char *client_id = (char*)cursor->data;
826 __find_worker(client_id, &worker_fd);
828 ipc_svc->manager_queue_from_epoll = g_list_delete_link(ipc_svc->manager_queue_from_epoll, cursor);
829 pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
832 g_hash_table_remove(ipc_svc->client_worker_map, client_id);
835 // stop worker thread
838 pims_ipc_worker_data_s *worker_data;
840 pthread_mutex_lock(&ipc_svc->task_fds_mutex);
841 if (FALSE == g_hash_table_lookup_extended(ipc_svc->task_fds,
842 GINT_TO_POINTER(worker_fd), (gpointer*)&org_fd, (gpointer*)&worker_data)) {
843 ERROR("g_hash_table_lookup_extended fail : worker_fd (%d)", worker_fd);
844 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
847 worker_data->stop_thread = true;
848 worker_data->client_fd = -1;
849 pthread_mutex_unlock(&ipc_svc->task_fds_mutex);
851 write_command(worker_fd, 1);
852 VERBOSE("write command to worker terminate (worker_fd : %d)", worker_fd);
856 pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
859 pthread_mutex_lock(&ipc_svc->manager_queue_from_worker_mutex);
860 if (ipc_svc->manager_queue_from_worker) {
862 cursor = g_list_first(ipc_svc->manager_queue_from_worker);
864 worker_fd = (int)cursor->data;
865 ipc_svc->manager_queue_from_worker = g_list_delete_link(ipc_svc->manager_queue_from_worker, cursor);
868 DEBUG("add idle worker_fd : %d", worker_fd);
869 ipc_svc->workers = g_list_append(ipc_svc->workers, (void*)worker_fd);
871 cursor = g_list_first(ipc_svc->manager_queue_from_worker);
873 pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
876 pthread_mutex_unlock(&ipc_svc->manager_queue_from_worker_mutex);
881 // if delete = true, steal client_id, then free(client_id)
882 // if delete = false, return client_id pointer, then do no call free(client_id
883 static int __find_client_id(pims_ipc_svc_s *ipc_svc, int client_fd, bool delete, char **client_id)
885 pims_ipc_client_map_s *client;
886 GList *cursor = NULL;
887 cursor = g_list_first(ipc_svc->client_id_fd_map);
889 client = cursor->data;
890 if (client->fd == client_fd) {
891 *client_id = client->id;
894 ipc_svc->client_id_fd_map = g_list_delete_link(ipc_svc->client_id_fd_map, cursor); //free(client);
899 cursor = g_list_next(cursor);
904 static void __request_push(pims_ipc_svc_s *ipc_svc, char *client_id, int client_fd, pims_ipc_raw_data_s *data)
908 pims_ipc_request_s *data_queue = NULL;
910 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
911 ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd,(gpointer*)&data_queue);
912 if (ret == TRUE && data_queue) {
915 data_queue = calloc(1, sizeof(pims_ipc_request_s));
916 data_queue->request_count = 0;
917 pthread_mutex_init(&data_queue->raw_data_mutex, 0);
919 g_hash_table_insert(ipc_svc->request_data_queue, g_strdup(client_id), data_queue);
921 ipc_svc->request_queue = g_list_append(ipc_svc->request_queue, g_strdup(client_id));
922 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
924 pthread_mutex_lock(&data_queue->raw_data_mutex);
925 data_queue->raw_data = g_list_append(data_queue->raw_data, data);
926 data_queue->client_fd = client_fd;
927 data_queue->request_count++;
928 pthread_mutex_unlock(&data_queue->raw_data_mutex);
931 static void __delete_request_queue(pims_ipc_svc_s *ipc_svc, char *client_id)
933 pims_ipc_request_s *data_queue = NULL;
939 pthread_mutex_lock(&ipc_svc->request_data_queue_mutex);
940 ret = g_hash_table_lookup_extended(ipc_svc->request_data_queue, (void*)client_id, (gpointer*)&org_fd,(gpointer*)&data_queue);
942 g_hash_table_remove(ipc_svc->request_data_queue, (void*)client_id);
944 cursor = g_list_first(ipc_svc->request_queue);
948 cursor = g_list_next(cursor);
949 if (id && strcmp(id, client_id) == 0) {
951 ipc_svc->request_queue = g_list_delete_link(ipc_svc->request_queue, l);
954 pthread_mutex_unlock(&ipc_svc->request_data_queue_mutex);
957 pthread_mutex_lock(&data_queue->raw_data_mutex);
958 cursor = g_list_first(data_queue->raw_data);
959 pims_ipc_raw_data_s *data;
962 data = (pims_ipc_raw_data_s *)cursor->data;
963 cursor = g_list_next(cursor);
964 data_queue->raw_data = g_list_delete_link(data_queue->raw_data, l);
965 __free_raw_data(data);
967 g_list_free(data_queue->raw_data);
968 pthread_mutex_unlock(&data_queue->raw_data_mutex);
969 pthread_mutex_destroy(&data_queue->raw_data_mutex);
974 static int __send_identify(int fd, unsigned int seq_no, char *id, int id_len)
976 int len = sizeof(unsigned int) // total size
977 + id_len + sizeof(unsigned int) // id
978 + sizeof(unsigned int); // seq_no
983 memset(buf, 0x0, len+1);
986 memcpy(buf, (void*)&len, sizeof(unsigned int));
987 length += sizeof(unsigned int);
990 memcpy(buf+length, (void*)&(id_len), sizeof(unsigned int));
991 length += sizeof(unsigned int);
992 memcpy(buf+length, (void*)(id), id_len);
996 memcpy(buf+length, (void*)&(seq_no), sizeof(unsigned int));
997 length += sizeof(unsigned int);
999 return socket_send(fd, buf, length);
1002 static int __recv_raw_data(int fd, pims_ipc_raw_data_s **data, bool *identity)
1005 pims_ipc_raw_data_s *temp;
1007 /* read the size of message. note that ioctl is non-blocking */
1008 if (ioctl(fd, FIONREAD, &len)) {
1009 ERROR("ioctl failed: %d", errno);
1013 /* when server or client closed socket */
1015 INFO("[IPC Socket] connection is closed");
1019 temp = (pims_ipc_raw_data_s*)calloc(1, sizeof(pims_ipc_raw_data_s));
1020 temp->client_id = NULL;
1021 temp->client_id_len = 0;
1022 temp->call_id = NULL;
1023 temp->call_id_len = 0;
1025 temp->is_data = FALSE;
1030 unsigned int read_len = 0;
1031 unsigned int total_len = 0;
1032 unsigned int is_data = FALSE;
1036 ret = read(fd, (void *)&total_len, sizeof(unsigned int));
1037 if (ret < 0) { ERROR("read error"); break; }
1041 ret = read(fd, (void *)&(temp->client_id_len), sizeof(unsigned int));
1042 if (ret < 0) { ERROR("read error"); break; }
1045 temp->client_id = calloc(1, temp->client_id_len+1);
1046 ret = socket_recv(fd, (void *)&(temp->client_id), temp->client_id_len);
1047 if (ret < 0) { ERROR("socket_recv error"); break; }
1051 ret = read(fd, (void *)&(temp->seq_no), sizeof(unsigned int));
1052 if (ret < 0) { ERROR("read error"); break; }
1055 if (total_len == read_len) {
1062 ret = read(fd, (void *)&(temp->call_id_len), sizeof(unsigned int));
1063 if (ret < 0) { ERROR("read error"); break; }
1066 temp->call_id = calloc(1, temp->call_id_len+1);
1067 ret = socket_recv(fd, (void *)&(temp->call_id), temp->call_id_len);
1068 if (ret < 0) { ERROR("socket_recv error"); break; }
1072 ret = read(fd, (void *)&(is_data), sizeof(unsigned int));
1073 if (ret < 0) { ERROR("read error"); break; }
1078 temp->is_data = TRUE;
1079 ret = read(fd, (void *)&(temp->data_len), sizeof(unsigned int));
1080 if (ret < 0) { ERROR("read error"); break; }
1083 temp->data = calloc(1, temp->data_len+1);
1084 ret = socket_recv(fd, (void *)&(temp->data), temp->data_len);
1085 if (ret < 0) { ERROR("socket_recv error"); break; }
1089 INFO("client_id : %s, call_id : %s, seq_no : %d", temp->client_id, temp->call_id, temp->seq_no);
1096 __free_raw_data(temp);
1104 static gboolean __request_handler(GIOChannel *src, GIOCondition condition, gpointer data)
1107 int event_fd = g_io_channel_unix_get_fd(src);
1108 char *client_id = NULL;
1109 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1111 if (G_IO_HUP & condition) {
1112 INFO("client closed ------------------------client_fd : %d", event_fd);
1117 __find_client_id(ipc_svc, event_fd, true, &client_id);
1119 // Send client_id to manager to terminate worker thread
1121 pthread_mutex_lock(&ipc_svc->manager_queue_from_epoll_mutex);
1122 ipc_svc->manager_queue_from_epoll = g_list_append(ipc_svc->manager_queue_from_epoll, (void*)g_strdup(client_id));
1123 pthread_mutex_unlock(&ipc_svc->manager_queue_from_epoll_mutex);
1124 write_command(ipc_svc->manager, 1);
1126 __delete_request_queue(ipc_svc, client_id);
1133 // receive data from client
1135 bool identity = false;
1136 pims_ipc_raw_data_s *req = NULL;
1138 recv_len = __recv_raw_data(event_fd, &req, &identity);
1140 // send command to router
1142 pims_ipc_client_map_s *client = (pims_ipc_client_map_s*)calloc(1, sizeof(pims_ipc_client_map_s));
1143 client->fd = event_fd;
1144 client->id = req->client_id;
1145 req->client_id = NULL;
1146 ipc_svc->client_id_fd_map = g_list_append(ipc_svc->client_id_fd_map, client);
1148 // send server pid to client
1150 snprintf(temp, sizeof(temp), "%x", getpid());
1151 ret = __send_identify(event_fd, req->seq_no, temp, strlen(temp));
1153 __free_raw_data(req);
1159 __find_client_id(ipc_svc, event_fd, false, &client_id);
1162 __request_push(ipc_svc, client_id, event_fd, req);
1163 write_command(ipc_svc->router, 1);
1166 ERROR("__find_client_id fail : event_fd (%d)", event_fd);
1169 ERROR("receive invalid : %d", event_fd);
1177 static gboolean __socket_handler(GIOChannel *src, GIOCondition condition, gpointer data)
1179 GIOChannel *channel;
1180 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1181 int client_sockfd = -1;
1182 int sockfd = ipc_svc->sockfd;
1183 struct sockaddr_un clientaddr;
1184 socklen_t client_len = sizeof(clientaddr);
1186 client_sockfd = accept(sockfd, (struct sockaddr *)&clientaddr, &client_len);
1187 if (-1 == client_sockfd) {
1188 ERROR("accept error : %s", strerror(errno));
1192 channel = g_io_channel_unix_new(client_sockfd);
1193 g_io_add_watch(channel, G_IO_IN|G_IO_HUP, __request_handler, data);
1194 g_io_channel_unref(channel);
1199 static void* __main_loop(void *user_data)
1202 struct sockaddr_un addr;
1203 GIOChannel *gio = NULL;
1204 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)user_data;
1206 if (sd_listen_fds(1) == 1 && sd_is_socket_unix(SD_LISTEN_FDS_START, SOCK_STREAM, -1, ipc_svc->service, 0) > 0) {
1207 ipc_svc->sockfd = SD_LISTEN_FDS_START;
1210 unlink(ipc_svc->service);
1211 ipc_svc->sockfd = socket(PF_UNIX, SOCK_STREAM, 0);
1213 bzero(&addr, sizeof(addr));
1214 addr.sun_family = AF_UNIX;
1215 snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service);
1217 ret = bind(ipc_svc->sockfd, (struct sockaddr *)&addr, sizeof(addr));
1219 ERROR("bind error :%d", ret);
1220 ret = listen(ipc_svc->sockfd, 30);
1222 ret = chown(ipc_svc->service, getuid(), ipc_svc->group);
1223 ret = chmod(ipc_svc->service, ipc_svc->mode);
1226 gio = g_io_channel_unix_new(ipc_svc->sockfd);
1228 g_io_add_watch(gio, G_IO_IN, __socket_handler, (gpointer)ipc_svc);
1233 static int __open_router_fd(pims_ipc_svc_s *ipc_svc)
1240 // router inproc eventfd
1241 router = eventfd(0,0);
1243 ERROR("eventfd error : %d", errno);
1246 VERBOSE("router :%d\n", router);
1248 flags = fcntl(router, F_GETFL, 0);
1251 ret = fcntl (router, F_SETFL, flags | O_NONBLOCK);
1252 VERBOSE("rounter fcntl : %d\n", ret);
1254 // manager inproc eventfd
1255 manager = eventfd(0,0);
1256 if (-1 == manager) {
1257 ERROR("eventfd error : %d", errno);
1261 VERBOSE("manager :%d\n", manager);
1263 flags = fcntl(manager, F_GETFL, 0);
1266 ret = fcntl (manager, F_SETFL, flags | O_NONBLOCK);
1267 VERBOSE("manager fcntl : %d\n", ret);
1269 ipc_svc->router = router;
1270 ipc_svc->manager = manager;
1275 static void __close_router_fd(pims_ipc_svc_s *ipc_svc)
1277 close(ipc_svc->router);
1278 close(ipc_svc->manager);
1281 static void* __publish_loop(void *user_data)
1286 struct sockaddr_un addr;
1287 struct epoll_event ev = {0};
1288 pims_ipc_svc_for_publish_s *ipc_svc = (pims_ipc_svc_for_publish_s*)user_data;
1290 unlink(ipc_svc->service);
1291 ipc_svc->publish_sockfd = socket(PF_UNIX, SOCK_STREAM, 0);
1293 bzero(&addr, sizeof(struct sockaddr_un));
1294 addr.sun_family = AF_UNIX;
1295 snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", ipc_svc->service);
1297 int flags = fcntl (ipc_svc->publish_sockfd, F_GETFL, 0);
1300 ret = fcntl (ipc_svc->publish_sockfd, F_SETFL, flags | O_NONBLOCK);
1301 VERBOSE("publish socketfd fcntl : %d\n", ret);
1303 ret = bind(ipc_svc->publish_sockfd, (struct sockaddr *)&(addr), sizeof(struct sockaddr_un));
1305 ERROR("bind error :%d", ret);
1306 ret = listen(ipc_svc->publish_sockfd, 30);
1307 WARN_IF(ret != 0, "listen error :%d", ret);
1309 ret = chown(ipc_svc->service, getuid(), ipc_svc->group);
1310 WARN_IF(ret != 0, "listen error :%d", ret);
1311 ret = chmod(ipc_svc->service, ipc_svc->mode);
1312 WARN_IF(ret != 0, "listen error :%d", ret);
1314 epfd = epoll_create(MAX_EPOLL_EVENT);
1316 ev.events = EPOLLIN | EPOLLHUP;
1317 ev.data.fd = ipc_svc->publish_sockfd;
1319 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ipc_svc->publish_sockfd, &ev);
1320 WARN_IF(ret != 0, "listen error :%d", ret);
1322 while (!ipc_svc->epoll_stop_thread) {
1324 struct epoll_event events[MAX_EPOLL_EVENT] = {{0}, };
1325 int event_num = epoll_wait(epfd, events, MAX_EPOLL_EVENT, -1);
1327 if (ipc_svc->epoll_stop_thread)
1330 if (event_num == -1) {
1331 if (errno != EINTR) {
1332 ERROR("errno:%d\n", errno);
1337 for (i = 0; i < event_num; i++) {
1338 int event_fd = events[i].data.fd;
1340 if (events[i].events & EPOLLHUP) {
1341 VERBOSE("client closed -----------------------------------------:%d", event_fd);
1342 if (epoll_ctl(epfd, EPOLL_CTL_DEL, event_fd, events) == -1) {
1343 ERROR("epoll_ctl (EPOLL_CTL_DEL) fail : errno(%d)", errno);
1347 // Find client_id and delete
1348 GList *cursor = NULL;
1350 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
1351 cursor = g_list_first(ipc_svc->subscribe_fds);
1353 if (event_fd == (int)cursor->data) {
1354 ipc_svc->subscribe_fds = g_list_delete_link(ipc_svc->subscribe_fds, cursor);
1357 cursor = g_list_next(cursor);
1359 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
1362 else if (event_fd == ipc_svc->publish_sockfd) { // connect client
1363 struct sockaddr_un remote;
1364 int remote_len = sizeof(remote);
1365 int client_fd = accept(ipc_svc->publish_sockfd, (struct sockaddr *)&remote, (socklen_t*) &remote_len);
1366 if (client_fd == -1) {
1367 ERROR("accept fail : errno : %d", errno);
1370 VERBOSE("client subscriber connect: %d", client_fd);
1372 pthread_mutex_lock(&ipc_svc->subscribe_fds_mutex);
1373 ipc_svc->subscribe_fds = g_list_append(ipc_svc->subscribe_fds, (void*)client_fd);
1374 pthread_mutex_unlock(&ipc_svc->subscribe_fds_mutex);
1376 ev.events = EPOLLIN;
1377 ev.data.fd = client_fd;
1378 if (epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
1379 ERROR("epoll_ctl (EPOLL_CTL_ADD) fail : error(%d)\n", errno);
1386 close(ipc_svc->publish_sockfd);
1392 static void __stop_for_publish(pims_ipc_svc_for_publish_s *ipc_svc)
1394 ipc_svc->epoll_stop_thread = true;
1397 static void* __router_loop(void *data)
1399 pims_ipc_svc_s *ipc_svc = (pims_ipc_svc_s*)data;
1401 struct pollfd *pollfds;
1403 pollfds = (struct pollfd*) malloc (fd_count * sizeof (struct pollfd));
1405 pollfds[0].fd = ipc_svc->router;
1406 pollfds[0].events = POLLIN;
1407 pollfds[1].fd = ipc_svc->manager;
1408 pollfds[1].events = POLLIN;
1413 int check_router_queue = -1;
1414 int check_manager_queue = -1;
1417 ret = poll(pollfds, fd_count, 1000);
1418 if (ret == -1 && errno == EINTR) {
1420 continue; //return NULL;
1426 if (pollfds[0].revents & POLLIN) {
1427 // request router: send request to worker
1428 if (sizeof (dummy) == read_command(pollfds[0].fd, &dummy)) {
1429 check_router_queue = __process_router_event(ipc_svc, false);
1433 if (pollfds[1].revents & POLLIN) {
1435 if (sizeof (dummy) == read_command(pollfds[1].fd, &dummy)) {
1436 check_manager_queue = __process_manager_event(ipc_svc);
1437 if (ipc_svc->delay_count > 0)
1438 check_router_queue = __process_router_event(ipc_svc, true);
1444 while(check_router_queue > 0 || check_manager_queue > 0) {
1445 read_command(pollfds[0].fd, &dummy);
1446 check_router_queue = __process_router_event(ipc_svc, false);
1448 read_command(pollfds[1].fd, &dummy);
1449 check_manager_queue = __process_manager_event(ipc_svc);
1450 if (ipc_svc->delay_count > 0)
1451 check_router_queue = __process_router_event(ipc_svc, true);
1460 API void pims_ipc_svc_run_main_loop(GMainLoop* loop)
1463 GMainLoop* main_loop = loop;
1465 if (main_loop == NULL) {
1466 main_loop = g_main_loop_new(NULL, FALSE);
1469 if (_g_singleton_for_publish)
1470 __launch_thread(__publish_loop, _g_singleton_for_publish);
1473 ret = __open_router_fd(_g_singleton);
1477 // launch worker threads in advance
1478 for (i = 0; i < _g_singleton->workers_max_count; i++)
1479 __launch_thread(__worker_loop, _g_singleton);
1481 __launch_thread(__router_loop, _g_singleton);
1482 __main_loop(_g_singleton);
1485 g_main_loop_run(main_loop);
1488 __close_router_fd(_g_singleton);
1490 if (_g_singleton_for_publish)
1491 __stop_for_publish(_g_singleton_for_publish);
1495 API void pims_ipc_svc_set_client_disconnected_cb(pims_ipc_svc_client_disconnected_cb callback, void *user_data)
1497 if (_client_disconnected_cb.callback) {
1498 ERROR("already registered");
1501 _client_disconnected_cb.callback = callback;
1502 _client_disconnected_cb.user_data = user_data;