4 * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
6 * Licensed under the Apache License, Version 2.0 (the License);
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <sys/eventfd.h>
29 #include "pims-internal.h"
30 #include "pims-ipc-data-internal.h"
31 #include "pims-ipc-data.h"
32 #include "pims-socket.h"
33 #include "pims-ipc-utils.h"
34 #include "pims-ipc-worker.h"
36 #define PIMS_IPC_WORKER_THREAD_WAIT_TIME 100 /* milliseconds */
39 pims_ipc_svc_client_disconnected_cb callback;
41 } pims_ipc_svc_client_disconnected_cb_t;
43 /* idle_worker_pool SHOULD handle on main thread */
44 static GList *idle_worker_pool;
45 static GHashTable *worker_cb_table; /* call_id, cb_data */
46 static __thread pims_ipc_svc_client_disconnected_cb_t _client_disconnected_cb = {NULL, NULL};
48 static int unique_sequence_number;
49 static GHashTable *worker_client_info_map; /* key : worker_id, data : pims_ipc_client_info_s* */
50 static int client_register_info(pims_ipc_worker_data_s *worker_data, int client_pid);
51 static pthread_mutex_t _worker_client_mutex = PTHREAD_MUTEX_INITIALIZER; /* for worker_client_info_map */
53 int worker_wait_idle_worker_ready(pims_ipc_worker_data_s *worker_data)
55 struct timespec timeout = {0};
57 clock_gettime(CLOCK_REALTIME, &timeout);
58 timeout.tv_nsec += PIMS_IPC_WORKER_THREAD_WAIT_TIME * 1000000;
59 timeout.tv_sec += timeout.tv_nsec / 1000000000L;
60 timeout.tv_nsec = timeout.tv_nsec % 1000000000L;
62 pthread_mutex_lock(&worker_data->ready_mutex);
64 if (!worker_data->fd) {
65 WARN("worker fd is null, wait until worker thread create done.");
66 if (pthread_cond_timedwait(&worker_data->ready, &worker_data->ready_mutex, &timeout)) {
67 ERR("Get idle worker timeout Fail!");
68 pthread_mutex_unlock(&worker_data->ready_mutex);
73 pthread_mutex_unlock(&worker_data->ready_mutex);
77 pims_ipc_worker_data_s* worker_get_idle_worker(pims_ipc_svc_s *ipc_svc,
78 const char *client_id)
80 pims_ipc_worker_data_s *worker_data;
82 RETV_IF(NULL == client_id, NULL);
83 RETVM_IF(NULL == idle_worker_pool, NULL, "There is no idle worker");
85 worker_data = g_hash_table_lookup(ipc_svc->client_worker_map, client_id);
89 worker_data = idle_worker_pool->data;
91 idle_worker_pool = g_list_delete_link(idle_worker_pool, idle_worker_pool);
94 g_hash_table_insert(ipc_svc->client_worker_map, g_strdup(client_id), worker_data);
99 pims_ipc_worker_data_s* worker_find(pims_ipc_svc_s *ipc_svc, const char *client_id)
101 pims_ipc_worker_data_s *worker_data;
103 RETV_IF(NULL == client_id, NULL);
105 if (FALSE == g_hash_table_lookup_extended(ipc_svc->client_worker_map, client_id,
106 NULL, (gpointer*)&worker_data)) {
107 ERR("g_hash_table_lookup_extended(%s) Fail", client_id);
114 void worker_stop_client_worker(pims_ipc_svc_s *ipc_svc, const char *client_id)
116 pims_ipc_worker_data_s *worker_data;
118 worker_data = worker_find(ipc_svc, client_id);
120 /* remove client_fd */
121 g_hash_table_remove(ipc_svc->client_worker_map, client_id);
123 /* stop worker thread */
125 worker_data->stop_thread = TRUE;
126 worker_data->client_fd = -1;
127 write_command(worker_data->fd, 1);
128 DBG("write command to worker terminate(worker_fd:%d)", worker_data->fd);
133 void worker_free_raw_data(void *data)
135 pims_ipc_raw_data_s *raw_data = data;
137 if (NULL == raw_data)
140 free(raw_data->client_id);
141 free(raw_data->call_id);
142 free(raw_data->data);
146 void worker_free_data(gpointer data)
148 pims_ipc_worker_data_s *worker_data = data;
150 pthread_mutex_lock(&worker_data->queue_mutex);
151 if (worker_data->list)
152 g_list_free_full(worker_data->list, worker_free_raw_data);
153 pthread_mutex_unlock(&worker_data->queue_mutex);
155 pthread_cond_destroy(&worker_data->ready);
156 pthread_mutex_destroy(&worker_data->ready_mutex);
162 int worker_push_raw_data(pims_ipc_worker_data_s *worker_data, int client_fd,
163 pims_ipc_raw_data_s *data)
165 pthread_mutex_lock(&worker_data->queue_mutex);
166 worker_data->list = g_list_append(worker_data->list, data);
167 worker_data->client_fd = client_fd;
168 pthread_mutex_unlock(&worker_data->queue_mutex);
173 static gboolean worker_pop_raw_data(pims_ipc_worker_data_s *worker,
174 pims_ipc_raw_data_s **data)
179 pthread_mutex_lock(&worker->queue_mutex);
181 pthread_mutex_unlock(&worker->queue_mutex);
186 GList *cursor = g_list_first(worker->list);
187 if (NULL == cursor) {
188 pthread_mutex_unlock(&worker->queue_mutex);
192 *data = cursor->data;
193 worker->list = g_list_delete_link(worker->list, g_list_first(worker->list));
194 pthread_mutex_unlock(&worker->queue_mutex);
199 int worker_set_callback(char *call_id, pims_ipc_svc_cb_s *cb_data)
201 return g_hash_table_insert(worker_cb_table, call_id, cb_data);
204 static void __run_callback(int client_pid, char *call_id, pims_ipc_data_h dhandle_in,
205 pims_ipc_data_h *dhandle_out)
207 pims_ipc_svc_cb_s *cb_data = NULL;
209 VERBOSE("Call id [%s]", call_id);
211 cb_data = g_hash_table_lookup(worker_cb_table, call_id);
212 if (cb_data == NULL) {
213 VERBOSE("No Data for %s", call_id);
217 /* TODO: client_pid is not valide pims_ipc_h */
218 cb_data->callback((pims_ipc_h)client_pid, dhandle_in, dhandle_out, cb_data->user_data);
221 static void __make_raw_data(const char *call_id, int seq_no, pims_ipc_data_h data,
222 pims_ipc_raw_data_s **out)
224 pims_ipc_data_s *data_in = data;
225 pims_ipc_raw_data_s *raw_data = NULL;
228 RET_IF(NULL == call_id);
230 raw_data = calloc(1, sizeof(pims_ipc_raw_data_s));
231 if (NULL == raw_data) {
232 ERR("calloc() Fail(%d)", errno);
236 raw_data->call_id = g_strdup(call_id);
237 raw_data->call_id_len = strlen(raw_data->call_id);
238 raw_data->seq_no = seq_no;
240 if (data_in && 0 < data_in->buf_size) {
241 raw_data->has_data = TRUE;
242 raw_data->data = calloc(1, data_in->buf_size+1);
243 if (NULL == raw_data->data) {
244 ERR("calloc() Fail");
245 free(raw_data->call_id);
249 memcpy(raw_data->data, data_in->buf, data_in->buf_size);
250 raw_data->data_len = data_in->buf_size;
252 raw_data->has_data = FALSE;
253 raw_data->data_len = 0;
254 raw_data->data = NULL;
260 static int __send_raw_data(int fd, const char *client_id, pims_ipc_raw_data_s *data)
263 unsigned int len, total_len, client_id_len;
265 RETV_IF(NULL == data, -1);
266 RETV_IF(NULL == client_id, -1);
268 client_id_len = strlen(client_id);
270 len = sizeof(total_len) + sizeof(client_id_len) + client_id_len + sizeof(data->seq_no)
271 + data->call_id_len + sizeof(data->call_id) + sizeof(data->has_data);
274 if (data->has_data) {
275 len += sizeof(data->data_len);
276 total_len = len + data->data_len;
279 INFO("client_id: %s, call_id : %s, seq no :%d, len:%d, total len :%d", client_id,
280 data->call_id, data->seq_no, len, total_len);
284 memset(buf, 0x0, len+1);
286 memcpy(buf, &total_len, sizeof(total_len));
287 length += sizeof(total_len);
289 memcpy(buf+length, &client_id_len, sizeof(client_id_len));
290 length += sizeof(client_id_len);
291 memcpy(buf+length, client_id, client_id_len);
292 length += client_id_len;
294 memcpy(buf+length, &(data->seq_no), sizeof(data->seq_no));
295 length += sizeof(data->seq_no);
297 memcpy(buf+length, &(data->call_id_len), sizeof(data->call_id_len));
298 length += sizeof(data->call_id_len);
299 memcpy(buf+length, data->call_id, data->call_id_len);
300 length += data->call_id_len;
302 memcpy(buf+length, &(data->has_data), sizeof(data->has_data));
303 length += sizeof(data->has_data);
305 if (data->has_data) {
306 memcpy(buf+length, &(data->data_len), sizeof(data->data_len));
307 length += sizeof(data->data_len);
308 ret = socket_send(fd, buf, length);
312 ret += socket_send_data(fd, data->data, data->data_len);
314 ret = socket_send(fd, buf, length);
320 static int _get_pid_from_fd(int fd)
323 socklen_t uc_len = sizeof(uc);
325 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &uc_len) < 0)
326 ERR("getsockopt() Failed(%d)", errno);
328 DBG("Client PID(%d)", uc.pid);
332 static int __worker_loop_handle_raw_data(pims_ipc_worker_data_s *worker_data)
334 int disconnected = FALSE;
335 pims_ipc_data_h data_in = NULL;
336 pims_ipc_data_h data_out = NULL;
337 pims_ipc_raw_data_s *result = NULL;
338 pims_ipc_raw_data_s *raw_data = NULL;
340 if (FALSE == worker_pop_raw_data(worker_data, &raw_data))
343 int client_pid = _get_pid_from_fd(worker_data->client_fd);
345 if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_CREATE, raw_data->call_id)) {
346 client_register_info(worker_data, client_pid);
348 } else if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_DESTROY, raw_data->call_id)) {
351 data_in = pims_ipc_data_steal_unmarshal(raw_data->data, raw_data->data_len);
353 __run_callback(client_pid, raw_data->call_id, data_in, &data_out);
354 pims_ipc_data_destroy(data_in);
358 __make_raw_data(raw_data->call_id, raw_data->seq_no, data_out, &result);
359 pims_ipc_data_destroy(data_out);
361 __make_raw_data(raw_data->call_id, raw_data->seq_no, NULL, &result);
363 if (worker_data->client_fd != -1)
364 __send_raw_data(worker_data->client_fd, raw_data->client_id, result);
365 worker_free_raw_data(raw_data);
366 worker_free_raw_data(result);
371 static void* __worker_loop(void *data)
376 int disconnected = FALSE;
377 pims_ipc_worker_data_s *worker_data = data;
379 RETV_IF(NULL == data, NULL);
381 worker_fd = eventfd(0, 0);
385 INFO("worker Created ********** worker_fd = %d ***********", worker_fd);
387 pid = pthread_self();
388 worker_data->client_fd = -1;
389 worker_data->stop_thread = FALSE;
390 pthread_mutex_lock(&worker_data->ready_mutex);
391 worker_data->fd = worker_fd;
392 pthread_cond_signal(&worker_data->ready);
393 pthread_mutex_unlock(&worker_data->ready_mutex);
395 struct pollfd pollfds[1];
396 pollfds[0].fd = worker_fd;
397 pollfds[0].events = POLLIN;
398 pollfds[0].revents = 0;
400 while (!worker_data->stop_thread) {
401 ret = poll(pollfds, 1, 3000); /* waiting command from router */
404 ERR("poll() Fail(%d)", errno);
407 if (worker_data->stop_thread)
413 if (pollfds[0].revents & POLLIN) {
415 read_command(pollfds[0].fd, &dummy);
417 disconnected = __worker_loop_handle_raw_data(worker_data);
422 ERR("client fd closed, worker_fd : %d", worker_fd);
423 INFO("task thread terminated --------------------------- (worker_fd : %d)", worker_fd);
425 int flag = fcntl(worker_data->client_fd, F_GETFL, 0);
426 if (0 == (FD_CLOEXEC & flag)) {
427 int client_pid = _get_pid_from_fd(worker_data->client_fd);
428 pthread_mutex_lock(&_worker_client_mutex);
429 g_hash_table_remove(worker_client_info_map, GINT_TO_POINTER(client_pid));
430 DBG("client pid(%u) is removed", client_pid);
431 pthread_mutex_unlock(&_worker_client_mutex);
433 DBG("fd(%d) is already closed", worker_data->client_fd);
436 worker_free_data(worker_data);
439 if (_client_disconnected_cb.callback)
440 _client_disconnected_cb.callback((pims_ipc_h)pid, _client_disconnected_cb.user_data);
445 void worker_start_idle_worker(pims_ipc_svc_s *ipc_data)
448 pims_ipc_worker_data_s *worker_data;
450 for (i = g_list_length(idle_worker_pool); i < ipc_data->workers_max_count; i++) {
451 worker_data = calloc(1, sizeof(pims_ipc_worker_data_s));
452 if (NULL == worker_data) {
453 ERR("calloc() Fail(%d)", errno);
456 pthread_mutex_init(&worker_data->queue_mutex, 0);
457 pthread_mutex_init(&worker_data->ready_mutex, NULL);
458 pthread_cond_init(&worker_data->ready, NULL);
460 utils_launch_thread(__worker_loop, worker_data);
461 idle_worker_pool = g_list_append(idle_worker_pool, worker_data);
465 void worker_stop_idle_worker()
468 pims_ipc_worker_data_s *worker_data;
470 cursor = idle_worker_pool;
472 worker_data = cursor->data;
473 worker_data->stop_thread = TRUE;
474 write_command(worker_data->fd, 1);
475 cursor = cursor->next;
481 worker_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
482 WARN_IF(NULL == worker_cb_table, "worker cb table is NULL");
487 g_list_free(idle_worker_pool);
488 idle_worker_pool = NULL;
490 g_hash_table_destroy(worker_cb_table);
491 worker_cb_table = NULL;
494 API void pims_ipc_svc_set_client_disconnected_cb(
495 pims_ipc_svc_client_disconnected_cb callback, void *user_data)
497 if (_client_disconnected_cb.callback) {
498 ERR("already registered");
501 _client_disconnected_cb.callback = callback;
502 _client_disconnected_cb.user_data = user_data;
505 void client_destroy_info(gpointer p)
507 pims_ipc_client_info_s *client_info = p;
509 if (NULL == client_info)
511 free(client_info->smack);
512 free(client_info->uid);
513 free(client_info->client_session);
517 void client_init(void)
519 pthread_mutex_init(&_worker_client_mutex, 0);
521 pthread_mutex_lock(&_worker_client_mutex);
522 unique_sequence_number = 0;
523 if (worker_client_info_map) {
524 ERR("worker_client_info_map already exists");
525 pthread_mutex_unlock(&_worker_client_mutex);
529 worker_client_info_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
530 client_destroy_info);
531 pthread_mutex_unlock(&_worker_client_mutex);
534 void client_deinit(void)
536 DBG("----------destroied");
537 pthread_mutex_lock(&_worker_client_mutex);
538 g_hash_table_destroy(worker_client_info_map);
539 worker_client_info_map = NULL;
540 pthread_mutex_unlock(&_worker_client_mutex);
541 pthread_mutex_destroy(&_worker_client_mutex);
544 static int _create_client_info(int fd, pims_ipc_client_info_s **p_client_info)
548 char errmsg[1024] = {0};
550 pims_ipc_client_info_s *client_info = calloc(1, sizeof(pims_ipc_client_info_s));
551 if (NULL == client_info) {
552 ERR("calloc() return NULL");
556 ret = cynara_creds_socket_get_client(fd, CLIENT_METHOD_SMACK, &(client_info->smack));
557 if (CYNARA_API_SUCCESS != ret) {
558 cynara_strerror(ret, errmsg, sizeof(errmsg));
559 ERR("cynara_creds_socket_get_client() Fail(%d,%s)", ret, errmsg);
560 client_destroy_info(client_info);
564 ret = cynara_creds_socket_get_user(fd, USER_METHOD_UID, &(client_info->uid));
565 if (CYNARA_API_SUCCESS != ret) {
566 cynara_strerror(ret, errmsg, sizeof(errmsg));
567 ERR("cynara_creds_socket_get_user() Fail(%d,%s)", ret, errmsg);
568 client_destroy_info(client_info);
572 ret = cynara_creds_socket_get_pid(fd, &pid);
573 if (CYNARA_API_SUCCESS != ret) {
574 cynara_strerror(ret, errmsg, sizeof(errmsg));
575 ERR("cynara_creds_socket_get_pid() Fail(%d,%s)", ret, errmsg);
576 client_destroy_info(client_info);
580 client_info->client_session = cynara_session_from_pid(pid);
581 if (NULL == client_info->client_session) {
582 WARN("cynara_session_from_pid() return NULL");
583 client_info->client_session = strdup("");
585 *p_client_info = client_info;
590 static int client_register_info(pims_ipc_worker_data_s *worker_data, int client_pid)
592 pims_ipc_client_info_s *client_info = NULL;
595 ret = _create_client_info(worker_data->client_fd, &client_info);
597 ERR("_create_client_info() Fail(%d)", ret);
600 pthread_mutex_lock(&_worker_client_mutex);
601 g_hash_table_insert(worker_client_info_map, GINT_TO_POINTER(client_pid), client_info);
602 DBG("-------inserted:pid(%d), info(%p)", client_pid, client_info);
603 pthread_mutex_unlock(&_worker_client_mutex);
608 int client_get_unique_sequence_number(void)
610 return unique_sequence_number++;
613 pims_ipc_client_info_s* client_clone_info(pims_ipc_client_info_s *client_info)
615 if (NULL == client_info) {
616 ERR("client_info is NULL");
620 pims_ipc_client_info_s *clone = calloc(1, sizeof(pims_ipc_client_info_s));
622 ERR("calloc() Fail");
626 if (client_info->smack) {
627 clone->smack = strdup(client_info->smack);
628 if (NULL == clone->smack) {
629 ERR("strdup() Fail");
630 client_destroy_info(clone);
635 if (client_info->uid) {
636 clone->uid = strdup(client_info->uid);
637 if (NULL == clone->uid) {
638 ERR("strdup() Fail");
639 client_destroy_info(clone);
644 if (client_info->client_session) {
645 clone->client_session = strdup(client_info->client_session);
646 if (NULL == clone->client_session) {
647 ERR("strdup() Fail");
648 client_destroy_info(clone);
656 pims_ipc_client_info_s* client_get_info(int client_pid)
658 pims_ipc_client_info_s *client_info = NULL;
659 pims_ipc_client_info_s *client_info_clone = NULL;
661 pthread_mutex_lock(&_worker_client_mutex);
662 client_info = g_hash_table_lookup(worker_client_info_map, GINT_TO_POINTER(client_pid));
663 client_info_clone = client_clone_info(client_info);
664 pthread_mutex_unlock(&_worker_client_mutex);
665 DBG("Get client_info(%p) from pid(%d)", client_info, client_pid);
666 return client_info_clone;