2 * Copyright 2013 Samsung Electronics Co., Ltd
4 * Licensed under the Flora License, Version 1.1 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://floralicense.org/license/
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <secure_socket.h>
23 #include <sys/types.h>
24 #include <sys/timerfd.h>
32 #include "service_common.h"
38 #define EVT_END_CH 'x'
39 #define DEFAULT_TIMEOUT 2.0f
43 struct service_event_item {
54 int (*event_cb)(struct service_context *svc_cx, void *data);
60 * Server information and global (only in this file-scope) variables are defined
62 struct service_context {
63 pthread_t server_thid; /*!< Server thread Id */
64 int fd; /*!< Server socket handle */
66 Eina_List *tcb_list; /*!< TCB list, list of every thread for client connections */
67 pthread_mutex_t tcb_list_lock;
69 Eina_List *packet_list;
70 pthread_mutex_t packet_list_lock;
71 int evt_pipe[PIPE_MAX];
72 int tcb_pipe[PIPE_MAX];
74 int (*service_thread_main)(struct tcb *tcb, struct packet *packet, void *data);
75 void *service_thread_data;
77 Eina_List *event_list;
82 struct packet *packet;
87 * Thread Control Block
88 * - The main server will create a thread for every client connections.
89 * When a new client is comming to us, this TCB block will be allocated and initialized.
91 struct tcb { /* Thread controll block */
92 struct service_context *svc_ctx;
93 pthread_t thid; /*!< Thread Id */
94 int fd; /*!< Connection handle */
96 int ctrl_pipe[PIPE_MAX];
100 * Do services for clients
101 * Routing packets to destination processes.
104 static void *client_packet_pump_main(void *data)
106 struct tcb *tcb = data;
107 struct service_context *svc_ctx = tcb->svc_ctx;
108 struct packet *packet = NULL;
112 int packet_offset = 0;
117 char evt_ch = EVT_CH;
124 struct packet_info *packet_info;
128 recv_state = RECV_INIT;
131 * To escape from the switch statement, we use this ret value
135 FD_SET(tcb->fd, &set);
136 FD_SET(tcb->ctrl_pipe[PIPE_READ], &set);
137 fd = tcb->fd > tcb->ctrl_pipe[PIPE_READ] ? tcb->fd : tcb->ctrl_pipe[PIPE_READ];
138 ret = select(fd + 1, &set, NULL, NULL, NULL);
141 if (errno == EINTR) {
142 ErrPrint("INTERRUPTED\n");
146 ErrPrint("Error: %s\n", strerror(errno));
150 } else if (ret == 0) {
151 ErrPrint("Timeout\n");
158 if (FD_ISSET(tcb->ctrl_pipe[PIPE_READ], &set)) {
159 DbgPrint("Thread is canceled\n");
166 if (!FD_ISSET(tcb->fd, &set)) {
167 ErrPrint("Unexpected handler is toggled\n");
176 * Service!!! Receive packet & route packet
178 switch (recv_state) {
180 size = packet_header_size();
186 ErrPrint("Heap: %s\n", strerror(errno));
190 recv_state = RECV_HEADER;
191 /* Go through, don't break from here */
193 ret = secure_socket_recv(tcb->fd, ptr, size - recv_offset, &pid);
206 if (recv_offset == size) {
207 packet = packet_build(packet, packet_offset, ptr, size);
215 packet_offset += recv_offset;
217 size = packet_payload_size(packet);
219 recv_state = RECV_DONE;
224 recv_state = RECV_PAYLOAD;
229 ErrPrint("Heap: %s\n", strerror(errno));
235 ret = secure_socket_recv(tcb->fd, ptr, size - recv_offset, &pid);
248 if (recv_offset == size) {
249 packet = packet_build(packet, packet_offset, ptr, size);
257 packet_offset += recv_offset;
259 recv_state = RECV_DONE;
269 if (recv_state == RECV_DONE) {
271 * Push this packet to the packet list with TCB
272 * Then the service main function will get this.
274 packet_info = malloc(sizeof(*packet_info));
277 ErrPrint("Heap: %s\n", strerror(errno));
278 packet_destroy(packet);
282 packet_info->packet = packet;
283 packet_info->tcb = tcb;
285 CRITICAL_SECTION_BEGIN(&svc_ctx->packet_list_lock);
286 svc_ctx->packet_list = eina_list_append(svc_ctx->packet_list, packet_info);
287 CRITICAL_SECTION_END(&svc_ctx->packet_list_lock);
289 if (write(svc_ctx->evt_pipe[PIPE_WRITE], &evt_ch, sizeof(evt_ch)) != sizeof(evt_ch)) {
291 ErrPrint("Unable to write a pipe: %s\n", strerror(errno));
292 CRITICAL_SECTION_BEGIN(&svc_ctx->packet_list_lock);
293 svc_ctx->packet_list = eina_list_remove(svc_ctx->packet_list, packet_info);
294 CRITICAL_SECTION_END(&svc_ctx->packet_list_lock);
296 packet_destroy(packet);
297 DbgFree(packet_info);
298 ErrPrint("Terminate thread: %p\n", tcb);
301 DbgPrint("Packet received: %d bytes\n", packet_offset);
302 recv_state = RECV_INIT;
310 CRITICAL_SECTION_BEGIN(&svc_ctx->packet_list_lock);
311 EINA_LIST_FOREACH(svc_ctx->packet_list, l, packet_info) {
312 if (packet_info->tcb == tcb) {
313 DbgPrint("Reset ptr of the TCB[%p] in the list of packet info\n", tcb);
314 packet_info->tcb = NULL;
317 CRITICAL_SECTION_END(&svc_ctx->packet_list_lock);
321 * Emit a signal to collect this TCB from the SERVER THREAD.
323 if (write(svc_ctx->tcb_pipe[PIPE_WRITE], &tcb, sizeof(tcb)) != sizeof(tcb)) {
324 ErrPrint("Unable to write pipe: %s\n", strerror(errno));
334 static inline struct tcb *tcb_create(struct service_context *svc_ctx, int fd)
339 tcb = malloc(sizeof(*tcb));
341 ErrPrint("Heap: %s\n", strerror(errno));
345 if (pipe2(tcb->ctrl_pipe, O_NONBLOCK | O_CLOEXEC) < 0) {
346 ErrPrint("pipe2: %s\n", strerror(errno));
352 tcb->svc_ctx = svc_ctx;
353 tcb->type = TCB_CLIENT_TYPE_APP;
355 DbgPrint("Create a new service thread [%d]\n", fd);
356 status = pthread_create(&tcb->thid, NULL, client_packet_pump_main, tcb);
358 ErrPrint("Unable to create a new thread: %s\n", strerror(status));
359 CLOSE_PIPE(tcb->ctrl_pipe);
364 CRITICAL_SECTION_BEGIN(&svc_ctx->tcb_list_lock);
365 svc_ctx->tcb_list = eina_list_append(svc_ctx->tcb_list, tcb);
366 CRITICAL_SECTION_END(&svc_ctx->tcb_list_lock);
374 static inline void tcb_teminate_all(struct service_context *svc_ctx)
379 char ch = EVT_END_CH;
382 * We don't need to make critical section on here.
383 * If we call this after terminate the server thread first.
384 * Then there is no other thread to access tcb_list.
386 EINA_LIST_FREE(svc_ctx->tcb_list, tcb) {
388 * ASSERT(tcb->fd >= 0);
390 if (write(tcb->ctrl_pipe[PIPE_WRITE], &ch, sizeof(ch)) != sizeof(ch)) {
391 ErrPrint("write: %s\n", strerror(errno));
394 status = pthread_join(tcb->thid, &ret);
396 ErrPrint("Unable to join a thread: %s\n", strerror(status));
398 DbgPrint("Thread returns: %p\n", ret);
401 secure_socket_destroy_handle(tcb->fd);
403 CLOSE_PIPE(tcb->ctrl_pipe);
412 static inline void tcb_destroy(struct service_context *svc_ctx, struct tcb *tcb)
416 char ch = EVT_END_CH;
418 CRITICAL_SECTION_BEGIN(&svc_ctx->tcb_list_lock);
419 svc_ctx->tcb_list = eina_list_remove(svc_ctx->tcb_list, tcb);
420 CRITICAL_SECTION_END(&svc_ctx->tcb_list_lock);
422 * ASSERT(tcb->fd >= 0);
423 * Close the connection, and then collecting the return value of thread
425 if (write(tcb->ctrl_pipe[PIPE_WRITE], &ch, sizeof(ch)) != sizeof(ch)) {
426 ErrPrint("write: %s\n", strerror(errno));
429 status = pthread_join(tcb->thid, &ret);
431 ErrPrint("Unable to join a thread: %s\n", strerror(status));
433 DbgPrint("Thread returns: %p\n", ret);
436 secure_socket_destroy_handle(tcb->fd);
438 CLOSE_PIPE(tcb->ctrl_pipe);
446 static inline int update_fdset(struct service_context *svc_ctx, fd_set *set)
449 struct service_event_item *item;
454 FD_SET(svc_ctx->fd, set);
457 FD_SET(svc_ctx->tcb_pipe[PIPE_READ], set);
458 if (svc_ctx->tcb_pipe[PIPE_READ] > fd) {
459 fd = svc_ctx->tcb_pipe[PIPE_READ];
462 FD_SET(svc_ctx->evt_pipe[PIPE_READ], set);
463 if (svc_ctx->evt_pipe[PIPE_READ] > fd) {
464 fd = svc_ctx->evt_pipe[PIPE_READ];
467 EINA_LIST_FOREACH(svc_ctx->event_list, l, item) {
468 if (item->type == SERVICE_EVENT_TIMER) {
469 FD_SET(item->info.timer.fd, set);
470 if (fd < item->info.timer.fd) {
471 fd = item->info.timer.fd;
483 static inline void processing_timer_event(struct service_context *svc_ctx, fd_set *set)
485 uint64_t expired_count;
488 struct service_event_item *item;
490 EINA_LIST_FOREACH_SAFE(svc_ctx->event_list, l, n, item) {
491 switch (item->type) {
492 case SERVICE_EVENT_TIMER:
493 if (!FD_ISSET(item->info.timer.fd, set)) {
497 if (read(item->info.timer.fd, &expired_count, sizeof(expired_count)) == sizeof(expired_count)) {
498 DbgPrint("Expired %d times\n", expired_count);
499 if (item->event_cb(svc_ctx, item->cbdata) >= 0) {
503 ErrPrint("read: %s\n", strerror(errno));
506 if (!eina_list_data_find(svc_ctx->event_list, item)) {
510 svc_ctx->event_list = eina_list_remove(svc_ctx->event_list, item);
511 if (close(item->info.timer.fd) < 0) {
512 ErrPrint("close: %s\n", strerror(errno));
517 ErrPrint("Unknown event: %d\n", item->type);
524 * Accept new client connections
525 * And create a new thread for service.
527 * Create Client threads & Destroying them
530 static void *server_main(void *data)
532 struct service_context *svc_ctx = data;
540 struct packet_info *packet_info;
542 DbgPrint("Server thread is activated\n");
544 fd = update_fdset(svc_ctx, &set);
545 memcpy(&except_set, &set, sizeof(set));
547 ret = select(fd, &set, NULL, &except_set, NULL);
550 if (errno == EINTR) {
551 DbgPrint("INTERRUPTED\n");
554 ErrPrint("Error: %s\n", strerror(errno));
556 } else if (ret == 0) {
557 ErrPrint("Timeout\n");
562 if (FD_ISSET(svc_ctx->fd, &set)) {
563 client_fd = secure_socket_get_connection_handle(svc_ctx->fd);
565 ErrPrint("Failed to establish a new connection [%d]\n", svc_ctx->fd);
570 tcb = tcb_create(svc_ctx, client_fd);
572 ErrPrint("Failed to create a new TCB: %d (%d)\n", client_fd, svc_ctx->fd);
573 secure_socket_destroy_handle(client_fd);
577 if (FD_ISSET(svc_ctx->evt_pipe[PIPE_READ], &set)) {
578 if (read(svc_ctx->evt_pipe[PIPE_READ], &evt_ch, sizeof(evt_ch)) != sizeof(evt_ch)) {
579 ErrPrint("Unable to read pipe: %s\n", strerror(errno));
584 CRITICAL_SECTION_BEGIN(&svc_ctx->packet_list_lock);
585 packet_info = eina_list_nth(svc_ctx->packet_list, 0);
586 svc_ctx->packet_list = eina_list_remove(svc_ctx->packet_list, packet_info);
587 CRITICAL_SECTION_END(&svc_ctx->packet_list_lock);
592 * What happens if the client thread is terminated, so the packet_info->tcb is deleted
593 * while processing svc_ctx->service_thread_main?
595 ret = svc_ctx->service_thread_main(packet_info->tcb, packet_info->packet, svc_ctx->service_thread_data);
597 ErrPrint("Service thread returns: %d\n", ret);
600 packet_destroy(packet_info->packet);
601 DbgFree(packet_info);
608 processing_timer_event(svc_ctx, &set);
612 * Destroying TCB should be processed at last.
614 if (FD_ISSET(svc_ctx->tcb_pipe[PIPE_READ], &set)) {
615 Eina_List *lockfree_packet_list;
619 if (read(svc_ctx->tcb_pipe[PIPE_READ], &tcb, sizeof(tcb)) != sizeof(tcb)) {
620 ErrPrint("Unable to read pipe: %s\n", strerror(errno));
626 ErrPrint("Terminate service thread\n");
631 lockfree_packet_list = NULL;
632 CRITICAL_SECTION_BEGIN(&svc_ctx->packet_list_lock);
633 EINA_LIST_FOREACH_SAFE(svc_ctx->packet_list, l, n, packet_info) {
634 if (packet_info->tcb != tcb) {
638 svc_ctx->packet_list = eina_list_remove(svc_ctx->packet_list, packet_info);
639 lockfree_packet_list = eina_list_append(lockfree_packet_list, packet_info);
641 CRITICAL_SECTION_END(&svc_ctx->packet_list_lock);
643 EINA_LIST_FREE(lockfree_packet_list, packet_info) {
644 ret = read(svc_ctx->evt_pipe[PIPE_READ], &evt_ch, sizeof(evt_ch));
645 DbgPrint("Flushing filtered pipe: %d (%c)\n", ret, evt_ch);
646 ret = svc_ctx->service_thread_main(packet_info->tcb, packet_info->packet, svc_ctx->service_thread_data);
648 ErrPrint("Service thread returns: %d\n", ret);
650 packet_destroy(packet_info->packet);
651 DbgFree(packet_info);
656 * Invoke the service thread main, to notify the termination of a TCB
658 ret = svc_ctx->service_thread_main(tcb, NULL, svc_ctx->service_thread_data);
661 * at this time, the client thread can access this tcb.
662 * how can I protect this TCB from deletion without disturbing the server thread?
664 tcb_destroy(svc_ctx, tcb);
667 /* If there is no such triggered FD? */
671 * Consuming all pended packets before terminates server thread.
673 * If the server thread is terminated, we should flush all pended packets.
674 * And we should services them.
675 * While processing this routine, the mutex is locked.
676 * So every other client thread will be slowed down, sequently, every clients can meet problems.
677 * But in case of termination of server thread, there could be systemetic problem.
678 * This only should be happenes while terminating the master daemon process.
680 CRITICAL_SECTION_BEGIN(&svc_ctx->packet_list_lock);
681 EINA_LIST_FREE(svc_ctx->packet_list, packet_info) {
682 ret = read(svc_ctx->evt_pipe[PIPE_READ], &evt_ch, sizeof(evt_ch));
683 DbgPrint("Flushing pipe: %d (%c)\n", ret, evt_ch);
684 ret = svc_ctx->service_thread_main(packet_info->tcb, packet_info->packet, svc_ctx->service_thread_data);
686 ErrPrint("Service thread returns: %d\n", ret);
688 packet_destroy(packet_info->packet);
689 DbgFree(packet_info);
691 CRITICAL_SECTION_END(&svc_ctx->packet_list_lock);
693 tcb_teminate_all(svc_ctx);
701 HAPI struct service_context *service_common_create(const char *addr, int (*service_thread_main)(struct tcb *tcb, struct packet *packet, void *data), void *data)
704 struct service_context *svc_ctx;
706 if (!service_thread_main || !addr) {
707 ErrPrint("Invalid argument\n");
711 if (strncmp(addr, COM_CORE_REMOTE_SCHEME, strlen(COM_CORE_REMOTE_SCHEME))) {
714 offset = strlen(COM_CORE_LOCAL_SCHEME);
715 if (strncmp(addr, COM_CORE_LOCAL_SCHEME, offset)) {
719 if (unlink(addr + offset) < 0) {
720 ErrPrint("[%s] - %s\n", addr, strerror(errno));
724 svc_ctx = calloc(1, sizeof(*svc_ctx));
726 ErrPrint("Heap: %s\n", strerror(errno));
730 svc_ctx->fd = secure_socket_create_server(addr);
731 if (svc_ctx->fd < 0) {
736 svc_ctx->service_thread_main = service_thread_main;
737 svc_ctx->service_thread_data = data;
739 if (fcntl(svc_ctx->fd, F_SETFD, FD_CLOEXEC) < 0) {
740 ErrPrint("fcntl: %s\n", strerror(errno));
743 if (fcntl(svc_ctx->fd, F_SETFL, O_NONBLOCK) < 0) {
744 ErrPrint("fcntl: %s\n", strerror(errno));
747 if (pipe2(svc_ctx->evt_pipe, O_NONBLOCK | O_CLOEXEC) < 0) {
748 ErrPrint("pipe: %d\n", strerror(errno));
749 secure_socket_destroy_handle(svc_ctx->fd);
754 if (pipe2(svc_ctx->tcb_pipe, O_NONBLOCK | O_CLOEXEC) < 0) {
755 ErrPrint("pipe: %s\n", strerror(errno));
756 CLOSE_PIPE(svc_ctx->evt_pipe);
757 secure_socket_destroy_handle(svc_ctx->fd);
762 status = pthread_mutex_init(&svc_ctx->packet_list_lock, NULL);
764 ErrPrint("Unable to create a mutex: %s\n", strerror(status));
765 CLOSE_PIPE(svc_ctx->evt_pipe);
766 CLOSE_PIPE(svc_ctx->tcb_pipe);
767 secure_socket_destroy_handle(svc_ctx->fd);
772 DbgPrint("Creating server thread\n");
773 status = pthread_create(&svc_ctx->server_thid, NULL, server_main, svc_ctx);
775 ErrPrint("Unable to create a thread for shortcut service: %s\n", strerror(status));
776 status = pthread_mutex_destroy(&svc_ctx->packet_list_lock);
778 ErrPrint("Error: %s\n", strerror(status));
780 CLOSE_PIPE(svc_ctx->evt_pipe);
781 CLOSE_PIPE(svc_ctx->tcb_pipe);
782 secure_socket_destroy_handle(svc_ctx->fd);
789 * To give a chance to run for server thread.
800 HAPI int service_common_destroy(struct service_context *svc_ctx)
811 * Terminate server thread
813 if (write(svc_ctx->tcb_pipe[PIPE_WRITE], &status, sizeof(status)) != sizeof(status)) {
814 ErrPrint("Failed to write: %s\n", strerror(errno));
817 status = pthread_join(svc_ctx->server_thid, &ret);
819 ErrPrint("Join: %s\n", strerror(status));
821 DbgPrint("Thread returns: %p\n", ret);
824 secure_socket_destroy_handle(svc_ctx->fd);
826 status = pthread_mutex_destroy(&svc_ctx->packet_list_lock);
828 ErrPrint("Unable to destroy a mutex: %s\n", strerror(status));
831 CLOSE_PIPE(svc_ctx->evt_pipe);
832 CLOSE_PIPE(svc_ctx->tcb_pipe);
839 * SERVER THREAD or OTHER THREAD (not main)
841 HAPI int tcb_is_valid(struct service_context *svc_ctx, struct tcb *tcb)
847 CRITICAL_SECTION_BEGIN(&svc_ctx->tcb_list_lock);
848 EINA_LIST_FOREACH(svc_ctx->tcb_list, l, tmp) {
849 if (tmp == tcb /* && tcb->svc_ctx == svc_ctx */) {
854 CRITICAL_SECTION_END(&svc_ctx->tcb_list_lock);
863 HAPI int tcb_fd(struct tcb *tcb)
876 HAPI int tcb_client_type(struct tcb *tcb)
889 HAPI int tcb_client_type_set(struct tcb *tcb, enum tcb_type type)
895 DbgPrint("TCB[%p] Client type is changed to %d from %d\n", tcb, type, tcb->type);
904 HAPI struct service_context *tcb_svc_ctx(struct tcb *tcb)
917 HAPI int service_common_unicast_packet(struct tcb *tcb, struct packet *packet)
919 if (!tcb || !packet) {
920 DbgPrint("Invalid unicast: tcb[%p], packet[%p]\n", tcb, packet);
924 DbgPrint("Unicast packet\n");
925 return com_core_send(tcb->fd, (void *)packet_data(packet), packet_size(packet), DEFAULT_TIMEOUT);
932 HAPI int service_common_multicast_packet(struct tcb *tcb, struct packet *packet, int type)
936 struct service_context *svc_ctx;
939 if (!tcb || !packet) {
940 DbgPrint("Invalid multicast: tcb[%p], packet[%p]\n", tcb, packet);
944 svc_ctx = tcb->svc_ctx;
946 DbgPrint("Multicasting packets\n");
950 * Does not need to make a critical section from here.
952 EINA_LIST_FOREACH(svc_ctx->tcb_list, l, target) {
953 if (target == tcb || target->type != type) {
954 DbgPrint("Skip target: %p(%d) == %p/%d\n", target, target->type, tcb, type);
958 ret = com_core_send(target->fd, (void *)packet_data(packet), packet_size(packet), DEFAULT_TIMEOUT);
960 ErrPrint("Failed to send packet: %d\n", ret);
963 DbgPrint("Finish to multicast packet\n");
971 HAPI struct service_event_item *service_common_add_timer(struct service_context *svc_ctx, double timer, int (*timer_cb)(struct service_context *svc_cx, void *data), void *data)
973 struct service_event_item *item;
974 struct itimerspec spec;
976 item = calloc(1, sizeof(*item));
978 ErrPrint("Heap: %s\n", strerror(errno));
982 item->type = SERVICE_EVENT_TIMER;
983 item->info.timer.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
984 if (item->info.timer.fd < 0) {
985 ErrPrint("Error: %s\n", strerror(errno));
990 spec.it_interval.tv_sec = (time_t)timer;
991 spec.it_interval.tv_nsec = (timer - spec.it_interval.tv_sec) * 1000000000;
992 spec.it_value.tv_sec = 0;
993 spec.it_value.tv_nsec = 0;
995 if (timerfd_settime(item->info.timer.fd, 0, &spec, NULL) < 0) {
996 ErrPrint("Error: %s\n", strerror(errno));
997 if (close(item->info.timer.fd) < 0) {
998 ErrPrint("close: %s\n", strerror(errno));
1004 item->event_cb = timer_cb;
1005 item->cbdata = data;
1007 svc_ctx->event_list = eina_list_append(svc_ctx->event_list, item);
1015 HAPI int service_common_del_timer(struct service_context *svc_ctx, struct service_event_item *item)
1017 if (!eina_list_data_find(svc_ctx->event_list, item)) {
1018 ErrPrint("Invalid event item\n");
1022 svc_ctx->event_list = eina_list_remove(svc_ctx->event_list, item);
1024 if (close(item->info.timer.fd) < 0) {
1025 ErrPrint("close: %s\n", strerror(errno));
1031 HAPI int service_common_fd(struct service_context *ctx)