[issue] Get invalid client_info when two difference thread tried to connect.
[platform/core/pim/pims-ipc.git] / src / pims-ipc-worker.c
1 /*
2  * PIMS IPC
3  *
4  * Copyright (c) 2012 - 2016 Samsung Electronics Co., Ltd. All rights reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the License);
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an AS IS BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 #include <pthread.h>
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <poll.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <sys/eventfd.h>
26 #include <fcntl.h>
27 #include <glib.h>
28
29 #include "pims-internal.h"
30 #include "pims-ipc-data-internal.h"
31 #include "pims-ipc-data.h"
32 #include "pims-socket.h"
33 #include "pims-ipc-utils.h"
34 #include "pims-ipc-worker.h"
35
36 #define PIMS_IPC_WORKER_THREAD_WAIT_TIME 100 /* milliseconds */
37
38 typedef struct {
39         pims_ipc_svc_client_disconnected_cb callback;
40         void * user_data;
41 } pims_ipc_svc_client_disconnected_cb_t;
42
43 /* idle_worker_pool SHOULD handle on main thread */
44 static GList *idle_worker_pool;
45 static GHashTable *worker_cb_table; /* call_id, cb_data */
46 static __thread pims_ipc_svc_client_disconnected_cb_t _client_disconnected_cb = {NULL, NULL};
47
48 static int unique_sequence_number;
49 static GHashTable *worker_client_info_map; /* key : worker_id, data : pims_ipc_client_info_s* */
50 static int client_register_info(pims_ipc_worker_data_s *worker_data, int client_pid);
51
52 int worker_wait_idle_worker_ready(pims_ipc_worker_data_s *worker_data)
53 {
54         struct timespec timeout = {0};
55
56         clock_gettime(CLOCK_REALTIME, &timeout);
57         timeout.tv_nsec += PIMS_IPC_WORKER_THREAD_WAIT_TIME * 1000000;
58         timeout.tv_sec += timeout.tv_nsec / 1000000000L;
59         timeout.tv_nsec = timeout.tv_nsec % 1000000000L;
60
61         pthread_mutex_lock(&worker_data->ready_mutex);
62
63         if (!worker_data->fd) {
64                 WARN("worker fd is null, wait until worker thread create done.");
65                 if (pthread_cond_timedwait(&worker_data->ready, &worker_data->ready_mutex, &timeout)) {
66                         ERR("Get idle worker timeout Fail!");
67                         pthread_mutex_unlock(&worker_data->ready_mutex);
68                         return -1;
69                 }
70         }
71
72         pthread_mutex_unlock(&worker_data->ready_mutex);
73         return 0;
74 }
75
76 pims_ipc_worker_data_s* worker_get_idle_worker(pims_ipc_svc_s *ipc_svc,
77                 const char *client_id)
78 {
79         pims_ipc_worker_data_s *worker_data;
80
81         RETV_IF(NULL == client_id, NULL);
82         RETVM_IF(NULL == idle_worker_pool, NULL, "There is no idle worker");
83
84         worker_data = g_hash_table_lookup(ipc_svc->client_worker_map, client_id);
85         if (worker_data)
86                 return worker_data;
87
88         worker_data = idle_worker_pool->data;
89
90         idle_worker_pool = g_list_delete_link(idle_worker_pool, idle_worker_pool);
91
92         if (worker_data)
93                 g_hash_table_insert(ipc_svc->client_worker_map, g_strdup(client_id), worker_data);
94
95         return worker_data;
96 }
97
98 pims_ipc_worker_data_s* worker_find(pims_ipc_svc_s *ipc_svc, const char *client_id)
99 {
100         pims_ipc_worker_data_s *worker_data;
101
102         RETV_IF(NULL == client_id, NULL);
103
104         if (FALSE == g_hash_table_lookup_extended(ipc_svc->client_worker_map, client_id,
105                                 NULL, (gpointer*)&worker_data)) {
106                 ERR("g_hash_table_lookup_extended(%s) Fail", client_id);
107                 return NULL;
108         }
109
110         return worker_data;
111 }
112
113 void worker_stop_client_worker(pims_ipc_svc_s *ipc_svc, const char *client_id)
114 {
115         pims_ipc_worker_data_s *worker_data;
116
117         worker_data = worker_find(ipc_svc, client_id);
118
119         /* remove client_fd */
120         g_hash_table_remove(ipc_svc->client_worker_map, client_id);
121
122         /* stop worker thread */
123         if (worker_data) {
124                 worker_data->stop_thread = TRUE;
125                 worker_data->client_fd = -1;
126                 write_command(worker_data->fd, 1);
127                 DBG("write command to worker terminate(worker_fd:%d)", worker_data->fd);
128         }
129 }
130
131
132 void worker_free_raw_data(void *data)
133 {
134         pims_ipc_raw_data_s *raw_data = data;
135
136         if (NULL == raw_data)
137                 return;
138
139         free(raw_data->client_id);
140         free(raw_data->call_id);
141         free(raw_data->data);
142         free(raw_data);
143 }
144
145 void worker_free_data(gpointer data)
146 {
147         pims_ipc_worker_data_s *worker_data = data;
148
149         pthread_mutex_lock(&worker_data->queue_mutex);
150         if (worker_data->list)
151                 g_list_free_full(worker_data->list, worker_free_raw_data);
152         pthread_mutex_unlock(&worker_data->queue_mutex);
153
154         pthread_mutex_destroy(&worker_data->client_mutex);
155
156         pthread_cond_destroy(&worker_data->ready);
157         pthread_mutex_destroy(&worker_data->ready_mutex);
158
159         free(worker_data);
160 }
161
162
163 int worker_push_raw_data(pims_ipc_worker_data_s *worker_data, int client_fd,
164                 pims_ipc_raw_data_s *data)
165 {
166         pthread_mutex_lock(&worker_data->queue_mutex);
167         worker_data->list = g_list_append(worker_data->list, data);
168         worker_data->client_fd = client_fd;
169         pthread_mutex_unlock(&worker_data->queue_mutex);
170
171         return TRUE;
172 }
173
174 static gboolean worker_pop_raw_data(pims_ipc_worker_data_s *worker,
175                 pims_ipc_raw_data_s **data)
176 {
177         if (!worker)
178                 return FALSE;
179
180         pthread_mutex_lock(&worker->queue_mutex);
181         if (!worker->list) {
182                 pthread_mutex_unlock(&worker->queue_mutex);
183                 *data = NULL;
184                 return FALSE;
185         }
186
187         *data = g_list_first(worker->list)->data;
188         worker->list = g_list_delete_link(worker->list, g_list_first(worker->list));
189         pthread_mutex_unlock(&worker->queue_mutex);
190
191         return TRUE;
192 }
193
194 int worker_set_callback(char *call_id, pims_ipc_svc_cb_s *cb_data)
195 {
196         return g_hash_table_insert(worker_cb_table, call_id, cb_data);
197 }
198
199
200 static void __run_callback(int client_pid, char *call_id, pims_ipc_data_h dhandle_in,
201                 pims_ipc_data_h *dhandle_out)
202 {
203         pims_ipc_svc_cb_s *cb_data = NULL;
204
205         VERBOSE("Call id [%s]", call_id);
206
207         cb_data = g_hash_table_lookup(worker_cb_table, call_id);
208         if (cb_data == NULL) {
209                 VERBOSE("No Data for %s", call_id);
210                 return;
211         }
212
213         /* TODO: client_pid is not valide pims_ipc_h */
214         cb_data->callback((pims_ipc_h)client_pid, dhandle_in, dhandle_out, cb_data->user_data);
215 }
216
217 static void __make_raw_data(const char *call_id, int seq_no, pims_ipc_data_h data,
218                 pims_ipc_raw_data_s **out)
219 {
220         pims_ipc_data_s *data_in = data;
221         pims_ipc_raw_data_s *raw_data = NULL;
222
223         RET_IF(NULL == out);
224         RET_IF(NULL == call_id);
225
226         raw_data = calloc(1, sizeof(pims_ipc_raw_data_s));
227         if (NULL == raw_data) {
228                 ERR("calloc() Fail(%d)", errno);
229                 return;
230         }
231
232         raw_data->call_id = g_strdup(call_id);
233         raw_data->call_id_len = strlen(raw_data->call_id);
234         raw_data->seq_no = seq_no;
235
236         if (data_in && 0 < data_in->buf_size) {
237                 raw_data->has_data = TRUE;
238                 raw_data->data = calloc(1, data_in->buf_size+1);
239                 if (NULL == raw_data->data) {
240                         ERR("calloc() Fail");
241                         free(raw_data->call_id);
242                         free(raw_data);
243                         return;
244                 }
245                 memcpy(raw_data->data, data_in->buf, data_in->buf_size);
246                 raw_data->data_len = data_in->buf_size;
247         } else {
248                 raw_data->has_data = FALSE;
249                 raw_data->data_len = 0;
250                 raw_data->data = NULL;
251         }
252         *out = raw_data;
253         return;
254 }
255
256 static int __send_raw_data(int fd, const char *client_id, pims_ipc_raw_data_s *data)
257 {
258         int ret = 0;
259         unsigned int len, total_len, client_id_len;
260
261         RETV_IF(NULL == data, -1);
262         RETV_IF(NULL == client_id, -1);
263
264         client_id_len = strlen(client_id);
265
266         len = sizeof(total_len) + sizeof(client_id_len) + client_id_len + sizeof(data->seq_no)
267                 + data->call_id_len + sizeof(data->call_id) + sizeof(data->has_data);
268         total_len = len;
269
270         if (data->has_data) {
271                 len += sizeof(data->data_len);
272                 total_len = len + data->data_len;
273         }
274
275         INFO("client_id: %s, call_id : %s, seq no :%d, len:%d, total len :%d", client_id,
276                         data->call_id, data->seq_no, len, total_len);
277
278         int length = 0;
279         char buf[len+1];
280         memset(buf, 0x0, len+1);
281
282         memcpy(buf, &total_len, sizeof(total_len));
283         length += sizeof(total_len);
284
285         memcpy(buf+length, &client_id_len, sizeof(client_id_len));
286         length += sizeof(client_id_len);
287         memcpy(buf+length, client_id, client_id_len);
288         length += client_id_len;
289
290         memcpy(buf+length, &(data->seq_no), sizeof(data->seq_no));
291         length += sizeof(data->seq_no);
292
293         memcpy(buf+length, &(data->call_id_len), sizeof(data->call_id_len));
294         length += sizeof(data->call_id_len);
295         memcpy(buf+length, data->call_id, data->call_id_len);
296         length += data->call_id_len;
297
298         memcpy(buf+length, &(data->has_data), sizeof(data->has_data));
299         length += sizeof(data->has_data);
300
301         if (data->has_data) {
302                 memcpy(buf+length, &(data->data_len), sizeof(data->data_len));
303                 length += sizeof(data->data_len);
304                 ret = socket_send(fd, buf, length);
305
306                 /* send data */
307                 if (ret > 0)
308                         ret += socket_send_data(fd, data->data, data->data_len);
309         } else {
310                 ret = socket_send(fd, buf, length);
311         }
312
313         return ret;
314 }
315
316 static int _get_pid_from_fd(int fd)
317 {
318         struct ucred uc;
319         socklen_t uc_len = sizeof(uc);
320
321         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &uc_len) < 0)
322                 ERR("getsockopt() Failed(%d)", errno);
323
324         DBG("Client PID(%d)", uc.pid);
325         return uc.pid;
326 }
327
328 static int __worker_loop_handle_raw_data(pims_ipc_worker_data_s *worker_data)
329 {
330         int disconnected = FALSE;
331         pims_ipc_data_h data_in = NULL;
332         pims_ipc_data_h data_out = NULL;
333         pims_ipc_raw_data_s *result = NULL;
334         pims_ipc_raw_data_s *raw_data = NULL;
335
336         if (FALSE == worker_pop_raw_data(worker_data, &raw_data))
337                 return disconnected;
338
339         int client_pid = _get_pid_from_fd(worker_data->client_fd);
340
341         if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_CREATE, raw_data->call_id)) {
342                 client_register_info(worker_data, client_pid);
343
344         } else if (UTILS_STR_EQUAL == strcmp(PIMS_IPC_CALL_ID_DESTROY, raw_data->call_id)) {
345                 disconnected = TRUE;
346         } else {
347                 data_in = pims_ipc_data_steal_unmarshal(raw_data->data, raw_data->data_len);
348
349                 __run_callback(client_pid, raw_data->call_id, data_in, &data_out);
350                 pims_ipc_data_destroy(data_in);
351         }
352
353         if (data_out) {
354                 __make_raw_data(raw_data->call_id, raw_data->seq_no, data_out, &result);
355                 pims_ipc_data_destroy(data_out);
356         } else
357                 __make_raw_data(raw_data->call_id, raw_data->seq_no, NULL, &result);
358
359         if (worker_data->client_fd != -1)
360                 __send_raw_data(worker_data->client_fd, raw_data->client_id, result);
361         worker_free_raw_data(raw_data);
362         worker_free_raw_data(result);
363
364         return disconnected;
365 }
366
367 static void* __worker_loop(void *data)
368 {
369         int ret;
370         pthread_t pid;
371         int worker_fd;
372         int disconnected = FALSE;
373         pims_ipc_worker_data_s *worker_data = data;
374
375         RETV_IF(NULL == data, NULL);
376
377         worker_fd = eventfd(0, 0);
378         if (worker_fd == -1)
379                 return NULL;
380
381         INFO("worker Created ********** worker_fd = %d ***********", worker_fd);
382
383         pid = pthread_self();
384         worker_data->client_fd = -1;
385         worker_data->stop_thread = FALSE;
386         pthread_mutex_lock(&worker_data->ready_mutex);
387         worker_data->fd = worker_fd;
388         pthread_cond_signal(&worker_data->ready);
389         pthread_mutex_unlock(&worker_data->ready_mutex);
390
391         struct pollfd pollfds[1];
392         pollfds[0].fd = worker_fd;
393         pollfds[0].events = POLLIN;
394         pollfds[0].revents = 0;
395
396         while (!worker_data->stop_thread) {
397                 ret = poll(pollfds, 1, 3000); /* waiting command from router */
398                 if (-1 == ret) {
399                         if (errno != EINTR)
400                                 ERR("poll() Fail(%d)", errno);
401                         continue;
402                 }
403                 if (worker_data->stop_thread)
404                         break;
405
406                 if (0 == ret)
407                         continue;
408
409                 if (pollfds[0].revents & POLLIN) {
410                         uint64_t dummy;
411                         read_command(pollfds[0].fd, &dummy);
412
413                         disconnected = __worker_loop_handle_raw_data(worker_data);
414                 }
415         }
416
417         if (!disconnected)
418                 ERR("client fd closed, worker_fd : %d", worker_fd);
419         INFO("task thread terminated --------------------------- (worker_fd : %d)", worker_fd);
420
421         int flag = fcntl(worker_data->client_fd, F_GETFL, 0);
422         if (0 == (FD_CLOEXEC & flag)) {
423                 int client_pid = _get_pid_from_fd(worker_data->client_fd);
424                 pthread_mutex_lock(&worker_data->client_mutex);
425                 g_hash_table_remove(worker_client_info_map, GINT_TO_POINTER(client_pid));
426                 DBG("client pid(%u) is removed", client_pid);
427                 pthread_mutex_unlock(&worker_data->client_mutex);
428         } else {
429                 DBG("fd(%d) is already closed", worker_data->client_fd);
430         }
431
432         worker_free_data(worker_data);
433         close(worker_fd);
434
435         if (_client_disconnected_cb.callback)
436                 _client_disconnected_cb.callback((pims_ipc_h)pid, _client_disconnected_cb.user_data);
437
438         return NULL;
439 }
440
441 void worker_start_idle_worker(pims_ipc_svc_s *ipc_data)
442 {
443         int i;
444         pims_ipc_worker_data_s *worker_data;
445
446         for (i = g_list_length(idle_worker_pool); i < ipc_data->workers_max_count; i++) {
447                 worker_data = calloc(1, sizeof(pims_ipc_worker_data_s));
448                 if (NULL == worker_data) {
449                         ERR("calloc() Fail(%d)", errno);
450                         continue;
451                 }
452                 pthread_mutex_init(&worker_data->queue_mutex, 0);
453                 pthread_mutex_init(&worker_data->ready_mutex, NULL);
454                 pthread_cond_init(&worker_data->ready, NULL);
455                 pthread_mutex_init(&worker_data->client_mutex, 0);
456
457                 utils_launch_thread(__worker_loop, worker_data);
458                 idle_worker_pool = g_list_append(idle_worker_pool, worker_data);
459         }
460 }
461
462 void worker_stop_idle_worker()
463 {
464         GList *cursor;
465         pims_ipc_worker_data_s *worker_data;
466
467         cursor = idle_worker_pool;
468         while (cursor) {
469                 worker_data = cursor->data;
470                 worker_data->stop_thread = TRUE;
471                 write_command(worker_data->fd, 1);
472                 cursor = cursor->next;
473         }
474 }
475
476 void worker_init()
477 {
478         worker_cb_table = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free);
479         WARN_IF(NULL == worker_cb_table, "worker cb table is NULL");
480 }
481
482 void worker_deinit()
483 {
484         g_list_free(idle_worker_pool);
485         idle_worker_pool = NULL;
486
487         g_hash_table_destroy(worker_cb_table);
488         worker_cb_table = NULL;
489 }
490
491 API void pims_ipc_svc_set_client_disconnected_cb(
492                 pims_ipc_svc_client_disconnected_cb callback, void *user_data)
493 {
494         if (_client_disconnected_cb.callback) {
495                 ERR("already registered");
496                 return;
497         }
498         _client_disconnected_cb.callback = callback;
499         _client_disconnected_cb.user_data = user_data;
500 }
501
502 void client_destroy_info(gpointer p)
503 {
504         pims_ipc_client_info_s *client_info = p;
505
506         if (NULL == client_info)
507                 return;
508         free(client_info->smack);
509         free(client_info->uid);
510         free(client_info->client_session);
511         free(client_info);
512 }
513
514 void client_init(void)
515 {
516         unique_sequence_number = 0;
517         worker_client_info_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
518                         client_destroy_info);
519 }
520
521 void client_deinit(void)
522 {
523         DBG("----------destroied");
524         g_hash_table_destroy(worker_client_info_map);
525 }
526
527 static int _create_client_info(int fd, pims_ipc_client_info_s **p_client_info)
528 {
529         int ret;
530         pid_t pid;
531         char errmsg[1024] = {0};
532
533         pims_ipc_client_info_s *client_info = calloc(1, sizeof(pims_ipc_client_info_s));
534         if (NULL == client_info) {
535                 ERR("calloc() return NULL");
536                 return -1;
537         }
538
539         ret = cynara_creds_socket_get_client(fd, CLIENT_METHOD_SMACK, &(client_info->smack));
540         if (CYNARA_API_SUCCESS != ret) {
541                 cynara_strerror(ret, errmsg, sizeof(errmsg));
542                 ERR("cynara_creds_socket_get_client() Fail(%d,%s)", ret, errmsg);
543                 client_destroy_info(client_info);
544                 return -1;
545         }
546
547         ret = cynara_creds_socket_get_user(fd, USER_METHOD_UID, &(client_info->uid));
548         if (CYNARA_API_SUCCESS != ret) {
549                 cynara_strerror(ret, errmsg, sizeof(errmsg));
550                 ERR("cynara_creds_socket_get_user() Fail(%d,%s)", ret, errmsg);
551                 client_destroy_info(client_info);
552                 return -1;
553         }
554
555         ret = cynara_creds_socket_get_pid(fd, &pid);
556         if (CYNARA_API_SUCCESS != ret) {
557                 cynara_strerror(ret, errmsg, sizeof(errmsg));
558                 ERR("cynara_creds_socket_get_pid() Fail(%d,%s)", ret, errmsg);
559                 client_destroy_info(client_info);
560                 return -1;
561         }
562
563         client_info->client_session = cynara_session_from_pid(pid);
564         if (NULL == client_info->client_session) {
565                 ERR("cynara_session_from_pid() return NULL");
566                 client_destroy_info(client_info);
567                 return -1;
568         }
569         *p_client_info = client_info;
570
571         return 0;
572 }
573
574 static int client_register_info(pims_ipc_worker_data_s *worker_data, int client_pid)
575 {
576         pims_ipc_client_info_s *client_info = NULL;
577         int ret = 0;
578
579         ret = _create_client_info(worker_data->client_fd, &client_info);
580         if (ret < 0) {
581                 ERR("_create_client_info() Fail(%d)", ret);
582                 return -1;
583         }
584         pthread_mutex_lock(&worker_data->client_mutex);
585         g_hash_table_insert(worker_client_info_map, GINT_TO_POINTER(client_pid), client_info);
586         DBG("-------inserted:pid(%d), info(%p)", client_pid, client_info);
587         pthread_mutex_unlock(&worker_data->client_mutex);
588
589         return 0;
590 }
591
592 int client_get_unique_sequence_number(void)
593 {
594         return unique_sequence_number++;
595 }
596
597 pims_ipc_client_info_s* client_clone_info(pims_ipc_client_info_s *client_info)
598 {
599         if (NULL == client_info) {
600                 ERR("client_info is NULL");
601                 return NULL;
602         }
603
604         pims_ipc_client_info_s *clone = calloc(1, sizeof(pims_ipc_client_info_s));
605         if (NULL == clone) {
606                 ERR("calloc() Fail");
607                 return NULL;
608         }
609
610         if (client_info->smack) {
611                 clone->smack = strdup(client_info->smack);
612                 if (NULL == clone->smack) {
613                         ERR("strdup() Fail");
614                         client_destroy_info(clone);
615                         return NULL;
616                 }
617         }
618
619         if (client_info->uid) {
620                 clone->uid = strdup(client_info->uid);
621                 if (NULL == clone->uid) {
622                         ERR("strdup() Fail");
623                         client_destroy_info(clone);
624                         return NULL;
625                 }
626         }
627
628         if (client_info->client_session) {
629                 clone->client_session = strdup(client_info->client_session);
630                 if (NULL == clone->client_session) {
631                         ERR("strdup() Fail");
632                         client_destroy_info(clone);
633                         return NULL;
634                 }
635         }
636
637         return clone;
638 }
639
640 pims_ipc_client_info_s* client_get_info(int client_pid)
641 {
642         pims_ipc_client_info_s *client_info = NULL;
643
644         client_info = g_hash_table_lookup(worker_client_info_map, GINT_TO_POINTER(client_pid));
645         DBG("---------------get client_pid(%d)", client_pid);
646         return client_info;
647 }
648