clean up: devied files, remove router
[platform/core/pim/pims-ipc.git] / src / pims-ipc-worker.c
1 /*
2  * PIMS IPC
3  *
4  * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the License);
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an AS IS BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include <pthread.h>
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <poll.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <sys/eventfd.h>
26
27 #include <glib.h>
28
29 #include "pims-internal.h"
30 #include "pims-ipc-data-internal.h"
31 #include "pims-ipc-data.h"
32 #include "pims-socket.h"
33 #include "pims-ipc-utils.h"
34 #include "pims-ipc-worker.h"
35
36 #define PIMS_IPC_WORKER_THREAD_WAIT_TIME 100 /* milliseconds */
37
38 typedef struct {
39         pims_ipc_svc_client_disconnected_cb callback;
40         void * user_data;
41 } pims_ipc_svc_client_disconnected_cb_t;
42
43 /* idle_worker_pool SHOULD handle on main thread */
44 static GList *idle_worker_pool;
45 static GHashTable *worker_cb_table; /* call_id, cb_data */
46 static __thread pims_ipc_svc_client_disconnected_cb_t _client_disconnected_cb = {NULL, NULL};
47
48 static int unique_sequence_number;
49 static GHashTable *worker_client_info_map; /* key : worker_id, data : pims_ipc_client_info_s* */
50
51 int worker_wait_idle_worker_ready(pims_ipc_worker_data_s *worker_data)
52 {
53         struct timespec timeout = {0};
54
55         clock_gettime(CLOCK_REALTIME, &timeout);
56         timeout.tv_nsec += PIMS_IPC_WORKER_THREAD_WAIT_TIME * 1000000;
57         timeout.tv_sec += timeout.tv_nsec / 1000000000L;
58         timeout.tv_nsec = timeout.tv_nsec % 1000000000L;
59
60         pthread_mutex_lock(&worker_data->ready_mutex);
61
62         if (!worker_data->fd) {
63                 WARN("worker fd is null, wait until worker thread create done.");
64                 if (pthread_cond_timedwait(&worker_data->ready, &worker_data->ready_mutex, &timeout)) {
65                         ERR("Get idle worker timeout Fail!");
66                         pthread_mutex_unlock(&worker_data->ready_mutex);
67                         return -1;
68                 }
69         }
70
71         pthread_mutex_unlock(&worker_data->ready_mutex);
72         return 0;
73 }
74
75 pims_ipc_worker_data_s* worker_get_idle_worker(pims_ipc_svc_s *ipc_svc,
76                 const char *client_id)
77 {
78         pims_ipc_worker_data_s *worker_data;
79
80         RETV_IF(NULL == client_id, NULL);
81         RETVM_IF(NULL == idle_worker_pool, NULL, "There is no idle worker");
82
83         worker_data = g_hash_table_lookup(ipc_svc->client_worker_map, client_id);
84         if (worker_data)
85                 return worker_data;
86
87         worker_data = idle_worker_pool->data;
88
89         idle_worker_pool = g_list_delete_link(idle_worker_pool, idle_worker_pool);
90
91         if (worker_data)
92                 g_hash_table_insert(ipc_svc->client_worker_map, g_strdup(client_id), worker_data);
93
94         return worker_data;
95 }
96
97 pims_ipc_worker_data_s* worker_find(pims_ipc_svc_s *ipc_svc, const char *client_id)
98 {
99         pims_ipc_worker_data_s *worker_data;
100
101         RETV_IF(NULL == client_id, NULL);
102
103         if (FALSE == g_hash_table_lookup_extended(ipc_svc->client_worker_map, client_id,
104                                 NULL, (gpointer*)&worker_data)) {
105                 ERR("g_hash_table_lookup_extended(%s) Fail", client_id);
106                 return NULL;
107         }
108
109         return worker_data;
110 }
111
112 void worker_stop_client_worker(pims_ipc_svc_s *ipc_svc, const char *client_id)
113 {
114         pims_ipc_worker_data_s *worker_data;
115
116         worker_data = worker_find(ipc_svc, client_id);
117
118         /* remove client_fd */
119         g_hash_table_remove(ipc_svc->client_worker_map, client_id);
120
121         /* stop worker thread */
122         if (worker_data) {
123                 worker_data->stop_thread = TRUE;
124                 worker_data->client_fd = -1;
125                 write_command(worker_data->fd, 1);
126                 DBG("write command to worker terminate(worker_fd:%d)", worker_data->fd);
127         }
128 }
129
130
131 void worker_free_raw_data(void *data)
132 {
133         pims_ipc_raw_data_s *raw_data = data;
134
135         if (NULL == raw_data)
136                 return;
137
138         free(raw_data->client_id);
139         free(raw_data->call_id);
140         free(raw_data->data);
141         free(raw_data);
142 }
143
144 void worker_free_data(gpointer data)
145 {
146         pims_ipc_worker_data_s *worker_data = data;
147
148         pthread_mutex_lock(&worker_data->queue_mutex);
149         if (worker_data->list)
150                 g_list_free_full(worker_data->list, worker_free_raw_data);
151         pthread_mutex_unlock(&worker_data->queue_mutex);
152
153         pthread_cond_destroy(&worker_data->ready);
154         pthread_mutex_destroy(&worker_data->ready_mutex);
155
156         free(worker_data);
157 }
158
159
160 int worker_push_raw_data(pims_ipc_worker_data_s *worker_data, int client_fd,
161                 pims_ipc_raw_data_s *data)
162 {
163         pthread_mutex_lock(&worker_data->queue_mutex);
164         worker_data->list = g_list_append(worker_data->list, data);
165         worker_data->client_fd = client_fd;
166         pthread_mutex_unlock(&worker_data->queue_mutex);
167
168         return TRUE;
169 }
170
171 static gboolean worker_pop_raw_data(pims_ipc_worker_data_s *worker,
172                 pims_ipc_raw_data_s **data)
173 {
174         if (!worker)
175                 return FALSE;
176
177         pthread_mutex_lock(&worker->queue_mutex);
178         if (!worker->list) {
179                 pthread_mutex_unlock(&worker->queue_mutex);
180                 *data = NULL;
181                 return FALSE;
182         }
183
184         *data = g_list_first(worker->list)->data;
185         worker->list = g_list_delete_link(worker->list, g_list_first(worker->list));
186         pthread_mutex_unlock(&worker->queue_mutex);
187
188         return TRUE;
189 }
190
191 int worker_set_callback(char *call_id, pims_ipc_svc_cb_s *cb_data)
192 {
193         return g_hash_table_insert(worker_cb_table, call_id, cb_data);
194 }
195
196
197 static void __run_callback(int client_pid, char *call_id, pims_ipc_data_h dhandle_in,
198                 pims_ipc_data_h *dhandle_out)
199 {
200         pims_ipc_svc_cb_s *cb_data = NULL;
201
202         VERBOSE("Call id [%s]", call_id);
203
204         cb_data = g_hash_table_lookup(worker_cb_table, call_id);
205         if (cb_data == NULL) {
206                 VERBOSE("No Data for %s", call_id);
207                 return;
208         }
209
210         /* TODO: client_pid is not valide pims_ipc_h */
211         cb_data->callback((pims_ipc_h)client_pid, dhandle_in, dhandle_out, cb_data->user_data);
212 }
213
214 static void __make_raw_data(const char *call_id, int seq_no, pims_ipc_data_h data,
215                 pims_ipc_raw_data_s **out)
216 {
217         pims_ipc_data_s *data_in = data;
218         pims_ipc_raw_data_s *raw_data = NULL;
219
220         RET_IF(NULL == out);
221         RET_IF(NULL == call_id);
222
223         raw_data = calloc(1, sizeof(pims_ipc_raw_data_s));
224         if (NULL == raw_data) {
225                 ERR("calloc() Fail(%d)", errno);
226                 return;
227         }
228
229         raw_data->call_id = g_strdup(call_id);
230         raw_data->call_id_len = strlen(raw_data->call_id);
231         raw_data->seq_no = seq_no;
232
233         if (data_in && 0 < data_in->buf_size) {
234                 raw_data->has_data = TRUE;
235                 raw_data->data = calloc(1, data_in->buf_size+1);
236                 if (NULL == raw_data->data) {
237                         ERR("calloc() Fail");
238                         free(raw_data->call_id);
239                         free(raw_data);
240                         return;
241                 }
242                 memcpy(raw_data->data, data_in->buf, data_in->buf_size);
243                 raw_data->data_len = data_in->buf_size;
244         } else {
245                 raw_data->has_data = FALSE;
246                 raw_data->data_len = 0;
247                 raw_data->data = NULL;
248         }
249         *out = raw_data;
250         return;
251 }
252
253 static int __send_raw_data(int fd, const char *client_id, pims_ipc_raw_data_s *data)
254 {
255         int ret = 0;
256         unsigned int len, total_len, client_id_len;
257
258         RETV_IF(NULL == data, -1);
259         RETV_IF(NULL == client_id, -1);
260
261         client_id_len = strlen(client_id);
262
263         len = sizeof(total_len) + sizeof(client_id_len) + client_id_len + sizeof(data->seq_no)
264                 + data->call_id_len + sizeof(data->call_id) + sizeof(data->has_data);
265         total_len = len;
266
267         if (data->has_data) {
268                 len += sizeof(data->data_len);
269                 total_len = len + data->data_len;
270         }
271
272         INFO("client_id: %s, call_id : %s, seq no :%d, len:%d, total len :%d", client_id,
273                         data->call_id, data->seq_no, len, total_len);
274
275         int length = 0;
276         char buf[len+1];
277         memset(buf, 0x0, len+1);
278
279         memcpy(buf, &total_len, sizeof(total_len));
280         length += sizeof(total_len);
281
282         memcpy(buf+length, &client_id_len, sizeof(client_id_len));
283         length += sizeof(client_id_len);
284         memcpy(buf+length, client_id, client_id_len);
285         length += client_id_len;
286
287         memcpy(buf+length, &(data->seq_no), sizeof(data->seq_no));
288         length += sizeof(data->seq_no);
289
290         memcpy(buf+length, &(data->call_id_len), sizeof(data->call_id_len));
291         length += sizeof(data->call_id_len);
292         memcpy(buf+length, data->call_id, data->call_id_len);
293         length += data->call_id_len;
294
295         memcpy(buf+length, &(data->has_data), sizeof(data->has_data));
296         length += sizeof(data->has_data);
297
298         if (data->has_data) {
299                 memcpy(buf+length, &(data->data_len), sizeof(data->data_len));
300                 length += sizeof(data->data_len);
301                 ret = socket_send(fd, buf, length);
302
303                 /* send data */
304                 if (ret > 0)
305                         ret += socket_send_data(fd, data->data, data->data_len);
306         } else {
307                 ret = socket_send(fd, buf, length);
308         }
309
310         return ret;
311 }
312
313 static int _get_pid_from_fd(int fd)
314 {
315         struct ucred uc;
316         socklen_t uc_len = sizeof(uc);
317
318         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &uc_len) < 0)
319                 ERR("getsockopt() Failed(%d)", errno);
320
321         DBG("Client PID(%d)", uc.pid);
322         return uc.pid;
323 }
324
325 static int __worker_loop_handle_raw_data(pims_ipc_worker_data_s *worker_data)
326 {
327         int disconnected = FALSE;
328         pims_ipc_data_h data_in = NULL;
329         pims_ipc_data_h data_out = NULL;
330         pims_ipc_raw_data_s *result = NULL;
331         pims_ipc_raw_data_s *raw_data = NULL;
332
333         if (FALSE == worker_pop_raw_data(worker_data, &raw_data))
334                 return disconnected;
335
336         int client_pid = _get_pid_from_fd(worker_data->client_fd);
337
338         if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_CREATE, raw_data->call_id)) {
339                 client_register_info(worker_data->client_fd, client_pid);
340
341         } else if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_DESTROY, raw_data->call_id)) {
342                 disconnected = TRUE;
343         } else {
344                 data_in = pims_ipc_data_steal_unmarshal(raw_data->data, raw_data->data_len);
345
346                 __run_callback(client_pid, raw_data->call_id, data_in, &data_out);
347                 pims_ipc_data_destroy(data_in);
348         }
349
350         if (data_out) {
351                 __make_raw_data(raw_data->call_id, raw_data->seq_no, data_out, &result);
352                 pims_ipc_data_destroy(data_out);
353         } else
354                 __make_raw_data(raw_data->call_id, raw_data->seq_no, NULL, &result);
355
356         if (worker_data->client_fd != -1)
357                 __send_raw_data(worker_data->client_fd, raw_data->client_id, result);
358         worker_free_raw_data(raw_data);
359         worker_free_raw_data(result);
360
361         return disconnected;
362 }
363
364 static void* __worker_loop(void *data)
365 {
366         int ret;
367         pthread_t pid;
368         int worker_fd;
369         int disconnected = FALSE;
370         pims_ipc_worker_data_s *worker_data = data;
371
372         RETV_IF(NULL == data, NULL);
373
374         worker_fd = eventfd(0, 0);
375         if (worker_fd == -1)
376                 return NULL;
377
378         INFO("worker Created ********** worker_fd = %d ***********", worker_fd);
379
380         pid = pthread_self();
381         worker_data->client_fd = -1;
382         worker_data->stop_thread = FALSE;
383         pthread_mutex_lock(&worker_data->ready_mutex);
384         worker_data->fd = worker_fd;
385         pthread_cond_signal(&worker_data->ready);
386         pthread_mutex_unlock(&worker_data->ready_mutex);
387
388         struct pollfd pollfds[1];
389         pollfds[0].fd = worker_fd;
390         pollfds[0].events = POLLIN;
391
392         while (!worker_data->stop_thread) {
393                 ret = poll(pollfds, 1, 3000); /* waiting command from router */
394                 if (-1 == ret) {
395                         if (errno != EINTR)
396                                 ERR("poll() Fail(%d)", errno);
397                         continue;
398                 }
399                 if (worker_data->stop_thread)
400                         break;
401
402                 if (0 == ret)
403                         continue;
404
405                 if (pollfds[0].revents & POLLIN) {
406                         uint64_t dummy;
407                         read_command(pollfds[0].fd, &dummy);
408
409                         disconnected = __worker_loop_handle_raw_data(worker_data);
410                 }
411         }
412
413         if (!disconnected)
414                 ERR("client fd closed, worker_fd : %d", worker_fd);
415         INFO("task thread terminated --------------------------- (worker_fd : %d)", worker_fd);
416
417         int client_pid = _get_pid_from_fd(worker_data->client_fd);
418
419         worker_free_data(worker_data);
420         close(worker_fd);
421
422         /*      pthread_mutex_lock(&worker_data->client_mutex); */
423         g_hash_table_remove(worker_client_info_map, GINT_TO_POINTER(client_pid));
424         DBG("----------------removed(%d)", client_pid);
425         /*      pthread_mutex_unlock(&worker_data->client_mutex); */
426
427         if (_client_disconnected_cb.callback)
428                 _client_disconnected_cb.callback((pims_ipc_h)pid, _client_disconnected_cb.user_data);
429
430         return NULL;
431 }
432
433 void worker_start_idle_worker(pims_ipc_svc_s *ipc_data)
434 {
435         int i;
436         pims_ipc_worker_data_s *worker_data;
437
438         for (i = g_list_length(idle_worker_pool); i < ipc_data->workers_max_count; i++) {
439                 worker_data = calloc(1, sizeof(pims_ipc_worker_data_s));
440                 if (NULL == worker_data) {
441                         ERR("calloc() Fail(%d)", errno);
442                         continue;
443                 }
444                 pthread_mutex_init(&worker_data->queue_mutex, 0);
445                 pthread_mutex_init(&worker_data->ready_mutex, NULL);
446                 pthread_cond_init(&worker_data->ready, NULL);
447                 /* pthread_mutex_init(&worker_data->client_mutex, 0); */
448
449                 utils_launch_thread(__worker_loop, worker_data);
450                 idle_worker_pool = g_list_append(idle_worker_pool, worker_data);
451         }
452 }
453
454 void worker_stop_idle_worker()
455 {
456         GList *cursor;
457         pims_ipc_worker_data_s *worker_data;
458
459         cursor = idle_worker_pool;
460         while (cursor) {
461                 worker_data = cursor->data;
462                 worker_data->stop_thread = TRUE;
463                 write_command(worker_data->fd, 1);
464                 cursor = cursor->next;
465         }
466 }
467
468 void worker_init()
469 {
470         worker_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
471         WARN_IF(NULL == worker_cb_table, "worker cb table is NULL");
472 }
473
474 void worker_deinit()
475 {
476         g_list_free(idle_worker_pool);
477         idle_worker_pool = NULL;
478
479         g_hash_table_destroy(worker_cb_table);
480         worker_cb_table = NULL;
481 }
482
483 API void pims_ipc_svc_set_client_disconnected_cb(
484                 pims_ipc_svc_client_disconnected_cb callback, void *user_data)
485 {
486         if (_client_disconnected_cb.callback) {
487                 ERR("already registered");
488                 return;
489         }
490         _client_disconnected_cb.callback = callback;
491         _client_disconnected_cb.user_data = user_data;
492 }
493
494 void client_destroy_info(gpointer p)
495 {
496         pims_ipc_client_info_s *client_info = p;
497
498         if (NULL == client_info)
499                 return;
500         free(client_info->smack);
501         free(client_info->uid);
502         free(client_info->client_session);
503         free(client_info);
504 }
505
506 void client_init(void)
507 {
508         unique_sequence_number = 0;
509         worker_client_info_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
510                         client_destroy_info);
511 }
512
513 void client_deinit(void)
514 {
515         DBG("----------destroied");
516         g_hash_table_destroy(worker_client_info_map);
517 }
518
519 static int _create_client_info(int fd, pims_ipc_client_info_s **p_client_info)
520 {
521         int ret;
522         pid_t pid;
523         char errmsg[1024] = {0};
524
525         pims_ipc_client_info_s *client_info = calloc(1, sizeof(pims_ipc_client_info_s));
526         if (NULL == client_info) {
527                 ERR("calloc() return NULL");
528                 return -1;
529         }
530
531         ret = cynara_creds_socket_get_client(fd, CLIENT_METHOD_SMACK, &(client_info->smack));
532         if (CYNARA_API_SUCCESS != ret) {
533                 cynara_strerror(ret, errmsg, sizeof(errmsg));
534                 ERR("cynara_creds_socket_get_client() Fail(%d,%s)", ret, errmsg);
535                 client_destroy_info(client_info);
536                 return -1;
537         }
538
539         ret = cynara_creds_socket_get_user(fd, USER_METHOD_UID, &(client_info->uid));
540         if (CYNARA_API_SUCCESS != ret) {
541                 cynara_strerror(ret, errmsg, sizeof(errmsg));
542                 ERR("cynara_creds_socket_get_user() Fail(%d,%s)", ret, errmsg);
543                 client_destroy_info(client_info);
544                 return -1;
545         }
546
547         ret = cynara_creds_socket_get_pid(fd, &pid);
548         if (CYNARA_API_SUCCESS != ret) {
549                 cynara_strerror(ret, errmsg, sizeof(errmsg));
550                 ERR("cynara_creds_socket_get_pid() Fail(%d,%s)", ret, errmsg);
551                 client_destroy_info(client_info);
552                 return -1;
553         }
554
555         client_info->client_session = cynara_session_from_pid(pid);
556         if (NULL == client_info->client_session) {
557                 ERR("cynara_session_from_pid() return NULL");
558                 client_destroy_info(client_info);
559                 return -1;
560         }
561         *p_client_info = client_info;
562
563         return 0;
564 }
565
566 int client_register_info(int client_fd, int client_pid)
567 {
568         pims_ipc_client_info_s *client_info = NULL;
569         int ret = 0;
570
571         ret = _create_client_info(client_fd, &client_info);
572         if (ret < 0) {
573                 ERR("_create_client_info() Fail(%d)", ret);
574                 return -1;
575         }
576         g_hash_table_insert(worker_client_info_map, GINT_TO_POINTER(client_pid), client_info);
577         DBG("-------inserted:pid(%d), info(%p)", client_pid, client_info);
578
579         return 0;
580 }
581
582 int client_get_unique_sequence_number(void)
583 {
584         return unique_sequence_number++;
585 }
586
587 pims_ipc_client_info_s* client_clone_info(pims_ipc_client_info_s *client_info)
588 {
589         if (NULL == client_info) {
590                 ERR("client_info is NULL");
591                 return NULL;
592         }
593
594         pims_ipc_client_info_s *clone = calloc(1, sizeof(pims_ipc_client_info_s));
595         if (NULL == clone) {
596                 ERR("calloc() Fail");
597                 return NULL;
598         }
599
600         if (client_info->smack) {
601                 clone->smack = strdup(client_info->smack);
602                 if (NULL == clone->smack) {
603                         ERR("strdup() Fail");
604                         client_destroy_info(clone);
605                         return NULL;
606                 }
607         }
608
609         if (client_info->uid) {
610                 clone->uid = strdup(client_info->uid);
611                 if (NULL == clone->uid) {
612                         ERR("strdup() Fail");
613                         client_destroy_info(clone);
614                         return NULL;
615                 }
616         }
617
618         if (client_info->client_session) {
619                 clone->client_session = strdup(client_info->client_session);
620                 if (NULL == clone->client_session) {
621                         ERR("strdup() Fail");
622                         client_destroy_info(clone);
623                         return NULL;
624                 }
625         }
626
627         return clone;
628 }
629
630 pims_ipc_client_info_s* client_get_info(int client_pid)
631 {
632         pims_ipc_client_info_s *client_info = NULL;
633
634         client_info = g_hash_table_lookup(worker_client_info_map, GINT_TO_POINTER(client_pid));
635         DBG("---------------get client_pid(%d)", client_pid);
636         return client_info;
637 }
638