Add worker thread to receive requests 95/211395/24
authorHwankyu Jhun <h.jhun@samsung.com>
Fri, 2 Aug 2019 09:46:19 +0000 (18:46 +0900)
committerHwanKyu Jhun <h.jhun@samsung.com>
Thu, 8 Aug 2019 23:05:15 +0000 (23:05 +0000)
Change-Id: I8189379f79335da0ae9ea5d012099fa7b219c38c
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
include/aul_worker.h [new file with mode: 0644]
src/aul_launch.c
src/aul_worker.c [new file with mode: 0644]

diff --git a/include/aul_worker.h b/include/aul_worker.h
new file mode 100644 (file)
index 0000000..ff24cd4
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __AUL_WORKER_H__
+#define __AUL_WORKER_H__
+
+#include <stdbool.h>
+
+typedef void *aul_worker_h;
+
+typedef bool (*aul_worker_io_job_cb)(int fd, void *user_data);
+
+aul_worker_h aul_worker_create(const char *name);
+
+void aul_worker_destroy(aul_worker_h handle);
+
+int aul_worker_add_io_job(aul_worker_h handle, const char *job_name,
+               int fd, aul_worker_io_job_cb callback, void *user_data);
+
+#endif /* __AUL_WORKER_H__ */
index c735520..d0839a3 100644 (file)
@@ -26,7 +26,6 @@
 #include <sys/time.h>
 #include <ctype.h>
 #include <glib.h>
-#include <gio/gio.h>
 #include <bundle_internal.h>
 
 #include "aul_api.h"
@@ -37,6 +36,7 @@
 #include "launch.h"
 #include "key.h"
 #include "aul_watch_control_internal.h"
+#include "aul_worker.h"
 
 #define ARRAY_SIZE(x) (sizeof(x) / sizeof(x[0]))
 
@@ -66,8 +66,8 @@ typedef struct data_control_provider_handler_s {
 } data_control_provider_handler;
 
 typedef struct launch_context_s {
-       GIOChannel *io;
-       guint source;
+       bool initialized;
+       aul_worker_h worker;
        aul_handler aul;
        subapp_handler subapp;
        data_control_provider_handler dcp;
@@ -272,21 +272,68 @@ static dispatcher __dispatcher[] = {
        [APP_TERM_BG_INSTANCE] = __dispatch_app_term_bg_inst,
 };
 
-static gboolean __aul_launch_handler(GIOChannel *io, GIOCondition condition,
-               gpointer data)
+static void __destroy_request(struct aul_request_s *req)
 {
-       int fd = g_io_channel_unix_get_fd(io);
-       struct aul_request_s req = { 0, };
+       if (req->b)
+               bundle_free(req->b);
+       free(req);
+}
+
+static struct aul_request_s *__create_request(int cmd, int clifd, bundle *b)
+{
+       struct aul_request_s *req;
+
+       req = malloc(sizeof(struct aul_request_s));
+       if (!req) {
+               _E("Out of memory");
+               return NULL;
+       }
+
+       req->cmd = cmd;
+       req->clifd = clifd;
+       req->b = b;
+
+       return req;
+}
+
+static gboolean __dispatch_request(gpointer data)
+{
+       struct aul_request_s *req = (struct aul_request_s *)data;
+
+       if (!__context.initialized) {
+               _W("Ignore request(%d)", req->cmd);
+               __destroy_request(req);
+               return G_SOURCE_REMOVE;
+       }
+
+       if (req->cmd >= APP_START && req->cmd < ARRAY_SIZE(__dispatcher) &&
+                       __dispatcher[req->cmd]) {
+               _W("Command(%s:%d)",
+                               aul_cmd_convert_to_string(req->cmd), req->cmd);
+               __dispatcher[req->cmd](req);
+       } else {
+               _E("Command(%s:%d) is not available",
+                               aul_cmd_convert_to_string(req->cmd), req->cmd);
+       }
+
+       __destroy_request(req);
+
+       return G_SOURCE_REMOVE;
+}
+
+static bool __worker_io_job_cb(int fd, void *user_data)
+{
+       struct aul_request_s *req;
        app_pkt_t *pkt;
        bundle *b = NULL;
        int clifd;
        struct ucred cr;
-       int r;
+       int ret;
 
        pkt = aul_sock_recv_pkt(fd, &clifd, &cr);
        if (!pkt) {
                _E("Failed to receive the packet");
-               return G_SOURCE_CONTINUE;
+               return true;
        }
 
        if (pkt->cmd != WIDGET_GET_CONTENT) {
@@ -294,13 +341,13 @@ static gboolean __aul_launch_handler(GIOChannel *io, GIOCondition condition,
                        close(clifd);
                        clifd = -1;
                } else {
-                       r = aul_sock_send_result(clifd, 0);
-                       if (r < 0) {
+                       ret = aul_sock_send_result(clifd, 0);
+                       if (ret < 0) {
                                _E("Failed to send result. cmd(%s:%d)",
                                        aul_cmd_convert_to_string(pkt->cmd),
                                        pkt->cmd);
                                free(pkt);
-                               return G_SOURCE_CONTINUE;;
+                               return true;
                        }
                        clifd = -1;
                }
@@ -313,48 +360,35 @@ static gboolean __aul_launch_handler(GIOChannel *io, GIOCondition condition,
                        if (clifd > 0)
                                close(clifd);
                        free(pkt);
-                       return G_SOURCE_CONTINUE;
+                       return true;
                }
        }
 
-       req.cmd = pkt->cmd;
-       req.clifd = clifd;
-       req.b = b;
-
+       req = __create_request(pkt->cmd, clifd, b);
        free(pkt);
-
-       if (req.cmd >= APP_START && req.cmd < ARRAY_SIZE(__dispatcher) &&
-                       __dispatcher[req.cmd]) {
-               _W("Command(%s:%d)",
-                               aul_cmd_convert_to_string(req.cmd), req.cmd);
-               __dispatcher[req.cmd](&req);
-       } else {
-               _E("Command(%s:%d) is not available",
-                               aul_cmd_convert_to_string(req.cmd), req.cmd);
+       if (!req) {
+               bundle_free(b);
+               return true;
        }
 
-       if (req.b)
-               bundle_free(req.b);
+       g_idle_add(__dispatch_request, req);
 
-       return G_SOURCE_CONTINUE;
+       return true;
 }
 
 static void __finalize_context(void)
 {
-       if (__context.source) {
-               g_source_remove(__context.source);
-               __context.source = 0;
+       if (__context.worker) {
+               aul_worker_destroy(__context.worker);
+               __context.worker = NULL;
        }
 
-       if (__context.io) {
-               g_io_channel_unref(__context.io);
-               __context.io = NULL;
-       }
+       __context.initialized = false;
 }
 
 static int __initialize_context(void)
 {
-       GIOCondition cond = G_IO_IN | G_IO_PRI | G_IO_HUP | G_IO_ERR;
+       int ret;
        int fd;
 
        fd = aul_initialize();
@@ -363,21 +397,21 @@ static int __initialize_context(void)
                return fd;
        }
 
-       __context.io = g_io_channel_unix_new(fd);
-       if (!__context.io) {
-               _E("Failed to create gio channel");
+       __context.worker = aul_worker_create("aul+");
+       if (!__context.worker) {
                __finalize_context();
                return AUL_R_ERROR;
        }
 
-       __context.source = g_io_add_watch(__context.io,
-                       cond, __aul_launch_handler, NULL);
-       if (!__context.source) {
-               _E("Failed to add gio watch");
+       ret = aul_worker_add_io_job(__context.worker, "AUL", fd,
+                       __worker_io_job_cb, NULL);
+       if (ret < 0) {
                __finalize_context();
-               return AUL_R_ERROR;
+               return ret;
        }
 
+       __context.initialized = true;
+
        return AUL_R_OK;
 }
 
diff --git a/src/aul_worker.c b/src/aul_worker.c
new file mode 100644 (file)
index 0000000..c488b1c
--- /dev/null
@@ -0,0 +1,280 @@
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/syscall.h>
+#include <fcntl.h>
+#include <glib.h>
+#include <gio/gio.h>
+
+#include "aul.h"
+#include "aul_util.h"
+#include "aul_worker.h"
+
+struct job_s {
+       char *name;
+       GIOChannel *channel;
+       guint tag;
+       void *worker;
+       void *callback;
+       void *user_data;
+};
+
+struct aul_worker_s {
+       char *name;
+       GThread *thread;
+       GRecMutex mutex;
+       GMainLoop *loop;
+       GList *jobs;
+};
+
+static void __destroy_job(gpointer data)
+{
+       struct job_s *job = (struct job_s *)data;
+
+       if (job->channel)
+               g_io_channel_unref(job->channel);
+
+       if (job->tag)
+               g_source_remove(job->tag);
+
+       free(job->name);
+       free(job);
+}
+
+static struct job_s *__create_job(const char *name,
+               GIOChannel *channel,
+               void *callback,
+               void *user_data)
+{
+       struct job_s *job;
+
+       job = calloc(1, sizeof(struct job_s));
+       if (!job) {
+               _E("Failed to create job");
+               return NULL;
+       }
+
+       job->name = strdup(name);
+       if (!job->name) {
+               _E("Failed to duplicate job name");
+               __destroy_job(job);
+               return NULL;
+       }
+
+       job->channel = channel;
+       job->callback = callback;
+       job->user_data = user_data;
+
+       return job;
+}
+
+static int __set_comm(const char *name)
+{
+       int fd;
+       ssize_t bytes_written;
+       char path[PATH_MAX];
+       pid_t tid = syscall(__NR_gettid);
+
+       _I("[%s] TID(%d)", name, tid);
+       snprintf(path, sizeof(path), "/proc/%d/comm", tid);
+       fd = open(path, O_WRONLY);
+       if (fd < 0) {
+               _E("Failed to open %s. error(%d)", path, errno);
+               return -1;
+       }
+
+       bytes_written = write(fd, name, strlen(name) + 1);
+       if (bytes_written < 0) {
+               _E("Failed to write name(%s)", name);
+               close(fd);
+               return -1;
+       }
+
+       close(fd);
+
+       return 0;
+}
+
+static gboolean __io_job_handler(GIOChannel *io, GIOCondition condition,
+               gpointer data)
+{
+       int fd = g_io_channel_unix_get_fd(io);
+       struct job_s *job = (struct job_s *)data;
+       aul_worker_io_job_cb callback = (aul_worker_io_job_cb)job->callback;
+       struct aul_worker_s *worker = (struct aul_worker_s *)job->worker;
+
+       if (callback(fd, job->user_data))
+               return G_SOURCE_CONTINUE;
+
+       _I("[__JOB__] name(%s)", job->name);
+
+       g_rec_mutex_lock(&worker->mutex);
+       worker->jobs = g_list_remove(worker->jobs, job);
+       g_rec_mutex_unlock(&worker->mutex);
+       job->tag = 0;
+       __destroy_job(job);
+
+       return G_SOURCE_REMOVE;
+}
+
+int aul_worker_add_io_job(aul_worker_h handle, const char *job_name,
+               int fd, aul_worker_io_job_cb callback, void *user_data)
+{
+       GIOCondition cond = G_IO_IN | G_IO_PRI | G_IO_HUP | G_IO_ERR;
+       struct aul_worker_s *worker = (struct aul_worker_s *)handle;
+       struct job_s *job;
+       GIOChannel *channel;
+       GSource *source;
+
+       if (!worker || !job_name || fd < 0 || !callback) {
+               _E("Invalid parameter");
+               return AUL_R_EINVAL;
+       }
+
+       channel = g_io_channel_unix_new(fd);
+       if (!channel) {
+               _E("Failed to create GIOChannel");
+               return AUL_R_ENOMEM;
+       }
+
+       source = g_io_create_watch(channel, cond);
+       if (!source) {
+               _E("Failed to create GSource");
+               g_io_channel_unref(channel);
+               return AUL_R_ENOMEM;
+       }
+
+       job = __create_job(job_name, channel, callback, user_data);
+       if (!job) {
+               _E("Failed to create job(%s)", job_name);
+               g_source_destroy(source);
+               g_source_unref(source);
+               g_io_channel_unref(channel);
+               return AUL_R_ENOMEM;
+       }
+
+       g_source_set_callback(source, (GSourceFunc)__io_job_handler, job, NULL);
+       g_source_set_priority(source, G_PRIORITY_DEFAULT);
+
+       g_rec_mutex_lock(&worker->mutex);
+       worker->jobs = g_list_append(worker->jobs, job);
+       g_rec_mutex_unlock(&worker->mutex);
+
+       job->tag = g_source_attach(source,
+                       g_main_loop_get_context(worker->loop));
+       job->worker = worker;
+       g_source_unref(source);
+
+       return AUL_R_OK;
+}
+
+void aul_worker_destroy(aul_worker_h handle)
+{
+       struct aul_worker_s *worker = (struct aul_worker_s *)handle;
+
+       if (!worker)
+               return;
+
+       g_main_loop_quit(worker->loop);
+
+       g_thread_join(worker->thread);
+       g_thread_unref(worker->thread);
+
+       g_list_free_full(worker->jobs, __destroy_job);
+       g_main_loop_unref(worker->loop);
+
+       g_rec_mutex_lock(&worker->mutex);
+       g_rec_mutex_unlock(&worker->mutex);
+       g_rec_mutex_clear(&worker->mutex);
+
+       free(worker->name);
+       free(worker);
+}
+
+static gpointer __worker_thread_loop(gpointer data)
+{
+       struct aul_worker_s *worker = (struct aul_worker_s *)data;
+       GMainContext *context;
+
+       __set_comm(worker->name);
+
+       context = g_main_loop_get_context(worker->loop);
+       g_main_context_push_thread_default(context);
+
+       g_main_loop_run(worker->loop);
+
+       g_main_context_pop_thread_default(context);
+
+       return NULL;
+}
+
+aul_worker_h aul_worker_create(const char *name)
+{
+       struct aul_worker_s *worker;
+       GMainContext *context;
+
+       if (!name) {
+               _E("Invalid parameter");
+               return NULL;
+       }
+
+       worker = calloc(1, sizeof(struct aul_worker_s));
+       if (!worker) {
+               _E("Out of memory");
+               return NULL;
+       }
+
+       g_rec_mutex_init(&worker->mutex);
+
+       worker->name = strdup(name);
+       if (!worker->name) {
+               _E("Failed to duplicate name");
+               aul_worker_destroy(worker);
+               return NULL;
+       }
+
+       context = g_main_context_new();
+       if (!context) {
+               _E("Failed to create GMainContext");
+               aul_worker_destroy(worker);
+               return NULL;
+       }
+
+       worker->loop = g_main_loop_new(context, FALSE);
+       g_main_context_unref(context);
+       if (!worker->loop) {
+               _E("Failed to create GMainLoop");
+               aul_worker_destroy(worker);
+               return NULL;
+       }
+
+       worker->thread = g_thread_new(name, __worker_thread_loop, worker);
+       if (!worker->thread) {
+               _E("Failed to create worker thread");
+               aul_worker_destroy(worker);
+               return NULL;
+       }
+
+       return worker;
+}