+++ /dev/null
-Copyright (c) 2012, Ronald B. Cemer\r
-All rights reserved.\r
-\r
-Redistribution and use in source and binary forms, with or without\r
-modification, are permitted provided that the following conditions are met:\r
-\r
-Redistributions of source code must retain the above copyright notice,\r
-this list of conditions and the following disclaimer.\r
-\r
-Redistributions in binary form must reproduce the above copyright notice,\r
-this list of conditions and the following disclaimer in the\r
-documentation and/or other materials provided with the distribution.\r
-\r
-Neither the name of Ronald B. Cemer nor the names of its\r
-contributors may be used to endorse or promote products derived from\r
-this software without specific prior written permission.\r
-\r
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\r
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED\r
-TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR\r
-PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR\r
-CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,\r
-EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,\r
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;\r
-OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,\r
-WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR\r
-OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF\r
-ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\r
Name: mused
Summary: A multimedia daemon
-Version: 0.3.92
+Version: 0.3.93
Release: 0
Group: System/Libraries
-License: Apache-2.0 and BSD-3-Clause
+License: Apache-2.0
Source0: %{name}-%{version}.tar.gz
Source1: muse-server.service
Source2: gtest.input
%files
%manifest %{name}.manifest
%{_libdir}/libmuse-*.so.*
-%license LICENSE.APLv2 LICENSE.BSD
+%license LICENSE.APLv2
%{_unitdir}/muse-server.service
%if ("%{_vd_cfg_product_type}" == "AUDIO")
%{_unitdir}/starter.target.wants/muse-server.service
#include "muse_server_security.h"
#include "muse_server_system.h"
#include "muse_server_watchdog.h"
-#include "muse_server_workqueue.h"
#include "muse_server_tool.h"
#include <systemd/sd-daemon.h>
#include <pwd.h>
#define MS_RECV_TRY_COUNT_MAX 3
#define MSG_DONE "DONE"
-gboolean ms_ipc_job_function(ms_workqueue_job_t *job);
-gboolean ms_ipc_data_job_function(ms_workqueue_job_t *job);
+gboolean ms_ipc_create_msg_dispatch_worker(muse_module_h m);
+gboolean ms_ipc_create_data_dispatch_worker(muse_module_h m);
typedef enum {
MUSE_SERVER_STATE_IDLE,
ms_security_t *security;
ms_system_t *system;
ms_watchdog_t *watchdog;
- ms_workqueue_t *workqueue;
ms_state_e state;
GMutex state_lock;
struct timeval tv_s;
+++ /dev/null
-/**
-* Multithreaded work queue.
-* Copyright (c) 2012 Ronald Bennett Cemer
-* This software is licensed under the BSD license.
-* See the accompanying LICENSE.txt for details.
-*/
-
-#ifndef __MUSE_SERVER_WORKQUEUE_H__
-#define __MUSE_SERVER_WORKQUEUE_H__
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#include "muse_core_internal.h"
-
-#define MUSE_WORK_THREAD_NUM 1
-
-typedef struct ms_workqueue_job {
- gboolean(*job_function) (struct ms_workqueue_job *job);
- void *user_data;
- struct ms_workqueue_job *prev;
- struct ms_workqueue_job *next;
-} ms_workqueue_job_t;
-
-typedef struct ms_workqueue_worker {
- pthread_t thread;
- int terminate;
- struct ms_workqueue *workqueue;
- struct ms_workqueue_worker *prev;
- struct ms_workqueue_worker *next;
-} ms_workqueue_worker_t;
-
-typedef struct ms_workqueue {
- struct ms_workqueue_worker *workers;
- struct ms_workqueue_job *waiting_jobs;
- pthread_mutex_t jobs_mutex;
- pthread_cond_t jobs_cond;
-} ms_workqueue_t;
-
-void ms_workqueue_init(ms_workqueue_t *workqueue);
-void ms_workqueue_deinit(ms_workqueue_t *workqueue);
-void ms_workqueue_add_job(ms_workqueue_t *workqueue, ms_workqueue_job_t *job);
-
-#ifdef __cplusplus
-}
-#endif
-#endif /* __MUSE_SERVER_WORKQUEUE_H__ */
return NULL;
}
-int ms_ipc_get_module_idx_from_job(ms_workqueue_job_t *job)
+gboolean ms_ipc_create_msg_dispatch_worker(muse_module_h m)
{
- LOGD("Enter");
- muse_module_h m = NULL;
-
- muse_return_val_if_fail(job, MM_ERROR_INVALID_ARGUMENT);
-
- m = (muse_module_h)job->user_data;
- muse_return_val_if_fail(m, MM_ERROR_INVALID_ARGUMENT);
-
- LOGD("Leave");
- return m->idx;
-}
-
-gboolean ms_ipc_job_function(ms_workqueue_job_t *job)
-{
- muse_module_h m = NULL;
GError *error = NULL;
LOGD("Enter");
muse_return_val_if_fail(ms_get_instance(), FALSE);
- muse_return_val_if_fail(job, FALSE);
muse_return_val_if_fail(ms_is_server_ready(), FALSE);
- m = (muse_module_h)job->user_data;
muse_return_val_if_fail(muse_server_module_is_valid(m), FALSE);
SECURE_LOGD("[PID %d %p] module's msg channel fd : %d", m->pid, m, m->ch[MUSE_CHANNEL_MSG].sock_fd);
ms_log_process_info(ms_get_instance()->pid);
}
- free(job);
muse_return_val_if_fail(m->ch[MUSE_CHANNEL_MSG].thread, FALSE);
LOGD("Leave");
return TRUE;
}
-gboolean ms_ipc_data_job_function(ms_workqueue_job_t *job)
+gboolean ms_ipc_create_data_dispatch_worker(muse_module_h m)
{
- muse_module_h m = NULL;
GError *error = NULL;
LOGD("Enter");
muse_return_val_if_fail(ms_get_instance(), FALSE);
- muse_return_val_if_fail(job, FALSE);
muse_return_val_if_fail(ms_is_server_ready(), FALSE);
-
- m = (muse_module_h)job->user_data;
- if (!muse_server_module_is_valid(m)) {
- free(job);
- LOGW("destroy is already called");
- return TRUE;
- }
+ muse_return_val_if_fail(muse_server_module_is_valid(m), TRUE);
m->ch[MUSE_CHANNEL_DATA].thread = g_thread_try_new(DATA_THREAD_NAME, _ms_ipc_data_worker, (gpointer)m, &error);
if (!m->ch[MUSE_CHANNEL_DATA].thread && error) {
muse_core_connection_close(m->ch[MUSE_CHANNEL_MSG].sock_fd);
}
- free(job);
muse_return_val_if_fail(m->ch[MUSE_CHANNEL_DATA].thread, FALSE);
LOGD("Leave");
return TRUE;
}
-
#define MUSE_LWIPC_WAIT_TIME 1000
#endif
-static gboolean (*job_functions[MUSE_CHANNEL_MAX])
- (ms_workqueue_job_t *job) = {
- ms_ipc_job_function,
- ms_ipc_data_job_function
- };
-
static const char *channel_name[MUSE_CHANNEL_MAX] = {
"msg",
"data"
muse_module_h peeked_m = NULL;
muse_module_h candidate_m = NULL;
intptr_t module_addr = 0;
- ms_workqueue_job_t *job = NULL;
GQueue *instance_queue = NULL;
ms_connection_t *connection = NULL;
LOGI("Enter");
muse_return_val_if_fail(muse_server, FALSE);
- muse_return_val_if_fail(muse_server->workqueue, FALSE);
connection = muse_server->connection;
muse_return_val_if_fail(connection, FALSE);
m->ch[MUSE_CHANNEL_MSG].sock_fd = client_sockfd;
m->pid = pid;
g_mutex_init(&m->dispatch_lock);
+
+ ms_ipc_create_msg_dispatch_worker(m);
+
} else {
_ms_get_module_addr(client_sockfd, &module_addr);
}
ms_connection_unlock(connection);
- }
- job = calloc(1, sizeof(ms_workqueue_job_t));
- if (!job) {
- LOGE("failed to allocate memory for job state");
- goto out;
+ ms_ipc_create_data_dispatch_worker(m);
}
- job->job_function = job_functions[channel];
- job->user_data = m;
-
- if (ms_is_server_ready() && muse_server_module_is_valid(m))
- ms_workqueue_add_job(muse_server->workqueue, job);
- else
- free(job);
-
_ms_unlock_state();
LOGI("Leave");
muse_core_connection_close(m->ch[MUSE_CHANNEL_MSG].sock_fd);
}
- MUSE_FREE(job);
_ms_unlock_state();
LOGE("Fail to initialize server watchdog");
#endif
- muse_server->workqueue = (ms_workqueue_t *)calloc(1, sizeof(ms_workqueue_t));
- muse_return_if_fail(muse_server->workqueue);
- ms_workqueue_init(muse_server->workqueue);
-
muse_server->connection = calloc(1, sizeof(ms_connection_t));
muse_return_if_fail(muse_server->connection);
ms_connection_init(muse_server->connection);
muse_return_val_if_fail(muse_server->log, retval);
muse_return_val_if_fail(muse_server->security, retval);
muse_return_val_if_fail(muse_server->watchdog, retval);
- muse_return_val_if_fail(muse_server->workqueue, retval);
muse_return_val_if_fail(muse_server->diag_thread, retval);
ms_recursive_rmdir(MUSE_DATA_ROOT_PATH);
for (idx = 0; idx < muse_server->conf->host_cnt; idx++)
ms_module_deinit(muse_server->module[idx]);
- ms_workqueue_deinit(muse_server->workqueue);
- muse_server->workqueue = NULL;
-
ms_security_deinit(muse_server->security);
muse_server->security = NULL;
+++ /dev/null
-/**
-* Multithreaded work queue.
-* Copyright (c) 2012 Ronald Bennett Cemer
-* This software is licensed under the BSD license.
-* See the accompanying LICENSE.txt for details.
-*/
-
-#include "muse_server_private.h"
-
-#define LL_ADD(item, list) { \
- item->prev = NULL; \
- item->next = list; \
- list = item; \
-}
-
-#define LL_REMOVE(item, list) { \
- if (item->prev) item->prev->next = item->next; \
- if (item->next) item->next->prev = item->prev; \
- if (list == item) list = item->next; \
- item->prev = item->next = NULL; \
-}
-
-static void *_ms_workqueue_worker_function(void *ptr);
-
-static void *_ms_workqueue_worker_function(void *ptr)
-{
- ms_workqueue_worker_t *worker = (ms_workqueue_worker_t *) ptr;
- ms_workqueue_job_t *job;
-
- muse_return_val_if_fail(worker, NULL);
-
- while (1) {
- pthread_mutex_lock(&worker->workqueue->jobs_mutex);
-
- if (worker->terminate) {
- LOGW("The value (%d) to be terminated is set", worker->terminate);
- pthread_mutex_unlock(&worker->workqueue->jobs_mutex);
- break;
- }
-
- while (!worker->workqueue->waiting_jobs)
- pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mutex);
-
- job = worker->workqueue->waiting_jobs;
-
- LL_REMOVE(job, worker->workqueue->waiting_jobs);
- LOGD("remove job %p", job);
-
- if (ms_is_server_ready() && !job->job_function(job)) {
- pthread_mutex_unlock(&worker->workqueue->jobs_mutex);
- LOGE("Error - Execute the workqueue job, Restart muse server...");
- ms_respawn(SIGABRT);
- break;
- }
- pthread_mutex_unlock(&worker->workqueue->jobs_mutex);
- }
-
- free(worker);
-
- LOGW("thread exit...");
- pthread_exit(NULL);
-}
-
-void ms_workqueue_init(ms_workqueue_t *workqueue)
-{
- ms_workqueue_worker_t *worker;
- int idx;
- pthread_attr_t attr;
-
- LOGD("Enter");
-
- muse_return_if_fail(workqueue);
-
- pthread_mutex_init(&workqueue->jobs_mutex, NULL);
- pthread_cond_init(&workqueue->jobs_cond, NULL);
-
- muse_return_if_fail(pthread_attr_init(&attr) == 0);
- if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) {
- LOGE("Failed to set detach state");
- goto out;
- }
-
- for (idx = 0; idx < MUSE_WORK_THREAD_NUM; idx++) {
- worker = calloc(1, sizeof(ms_workqueue_worker_t));
- muse_return_if_fail(worker);
-
- worker->workqueue = workqueue;
- if (pthread_create(&worker->thread, &attr, _ms_workqueue_worker_function, (void *)worker)) {
- LOGE("Failed to start all worker threads");
- free(worker);
- goto out;
- }
-
- LL_ADD(worker, worker->workqueue->workers);
- }
-
-out:
- muse_return_if_fail(pthread_attr_destroy(&attr) == 0);
-
- LOGD("Leave");
-}
-
-void ms_workqueue_deinit(ms_workqueue_t *workqueue)
-{
- ms_workqueue_worker_t *worker = NULL;
-
- LOGD("Enter");
-
- muse_return_if_fail(workqueue);
-
- /* Set all workers to terminate. */
- for (worker = workqueue->workers; worker; worker = worker->next)
- worker->terminate = 1;
-
- pthread_mutex_lock(&workqueue->jobs_mutex);
- workqueue->workers = NULL;
- workqueue->waiting_jobs = NULL;
- pthread_cond_broadcast(&workqueue->jobs_cond);
- pthread_mutex_unlock(&workqueue->jobs_mutex);
-
- pthread_mutex_destroy(&workqueue->jobs_mutex);
- pthread_cond_destroy(&workqueue->jobs_cond);
- free(workqueue);
-
- LOGD("Leave");
-}
-
-void ms_workqueue_add_job(ms_workqueue_t *workqueue, ms_workqueue_job_t *job)
-{
- muse_return_if_fail(workqueue);
- muse_return_if_fail(job);
-
- pthread_mutex_lock(&workqueue->jobs_mutex);
- LL_ADD(job, workqueue->waiting_jobs);
- LOGD("add job %p", job);
- pthread_cond_signal(&workqueue->jobs_cond);
- pthread_mutex_unlock(&workqueue->jobs_mutex);
-}
-