char *name;
GIOChannel *channel;
guint tag;
- void *worker;
void *callback;
void *user_data;
};
struct aul_worker_s {
char *name;
GThread *thread;
- GRecMutex mutex;
+ GMutex mutex;
+ GCond cond;
+ GMainContext *context;
GMainLoop *loop;
GList *jobs;
};
return 0;
}
+static gboolean __job_handler(gpointer data)
+{
+ struct job_s *job = (struct job_s *)data;
+ aul_worker_job_cb callback = (aul_worker_job_cb)job->callback;
+
+ if (callback(job->user_data))
+ return G_SOURCE_CONTINUE;
+
+ _I("[__JOB__] name(%s)", job->name);
+ job->tag = 0;
+
+ return G_SOURCE_REMOVE;
+}
+
+int aul_worker_add_idle_job(aul_worker_h handle, const char *job_name,
+ aul_worker_job_cb callback, void *user_data)
+{
+ struct aul_worker_s *worker = (struct aul_worker_s *)handle;
+ struct job_s *job;
+ GSource *source;
+
+ if (!worker || !job_name || !callback) {
+ _E("Invalid parameter");
+ return AUL_R_EINVAL;
+ }
+
+ source = g_idle_source_new();
+ if (!source) {
+ _E("Failed to create GSource");
+ return AUL_R_ENOMEM;
+ }
+
+ job = __create_job(job_name, NULL, callback, user_data);
+ if (!job) {
+ _E("Failed to create job(%s)", job_name);
+ g_source_unref(source);
+ return AUL_R_ENOMEM;
+ }
+
+ g_source_set_callback(source, (GSourceFunc)__job_handler, job, NULL);
+ g_source_set_priority(source, G_PRIORITY_DEFAULT);
+
+ g_mutex_lock(&worker->mutex);
+ worker->jobs = g_list_append(worker->jobs, job);
+ job->tag = g_source_attach(source, worker->context);
+ g_mutex_unlock(&worker->mutex);
+
+ g_source_unref(source);
+
+ return AUL_R_OK;
+}
+
static gboolean __io_job_handler(GIOChannel *io, GIOCondition condition,
gpointer data)
{
int fd = g_io_channel_unix_get_fd(io);
struct job_s *job = (struct job_s *)data;
aul_worker_io_job_cb callback = (aul_worker_io_job_cb)job->callback;
- struct aul_worker_s *worker = (struct aul_worker_s *)job->worker;
if (callback(fd, job->user_data))
return G_SOURCE_CONTINUE;
_I("[__JOB__] name(%s)", job->name);
-
- g_rec_mutex_lock(&worker->mutex);
- worker->jobs = g_list_remove(worker->jobs, job);
- g_rec_mutex_unlock(&worker->mutex);
job->tag = 0;
- __destroy_job(job);
return G_SOURCE_REMOVE;
}
job = __create_job(job_name, channel, callback, user_data);
if (!job) {
_E("Failed to create job(%s)", job_name);
- g_source_destroy(source);
g_source_unref(source);
g_io_channel_unref(channel);
return AUL_R_ENOMEM;
g_source_set_callback(source, (GSourceFunc)__io_job_handler, job, NULL);
g_source_set_priority(source, G_PRIORITY_DEFAULT);
- g_rec_mutex_lock(&worker->mutex);
+ g_mutex_lock(&worker->mutex);
worker->jobs = g_list_append(worker->jobs, job);
- g_rec_mutex_unlock(&worker->mutex);
+ job->tag = g_source_attach(source, worker->context);
+ g_mutex_unlock(&worker->mutex);
- job->tag = g_source_attach(source,
- g_main_loop_get_context(worker->loop));
- job->worker = worker;
g_source_unref(source);
return AUL_R_OK;
}
+static bool __quit_cb(void *user_data)
+{
+ struct aul_worker_s *worker = (struct aul_worker_s *)user_data;
+
+ g_main_loop_quit(worker->loop);
+
+ return false;
+}
+
void aul_worker_destroy(aul_worker_h handle)
{
struct aul_worker_s *worker = (struct aul_worker_s *)handle;
if (!worker)
return;
- g_main_loop_quit(worker->loop);
+ if (worker->thread) {
+ aul_worker_add_idle_job(worker, "Quit", __quit_cb, worker);
+ g_thread_join(worker->thread);
+ }
- g_thread_join(worker->thread);
- g_thread_unref(worker->thread);
+ g_cond_clear(&worker->cond);
- g_list_free_full(worker->jobs, __destroy_job);
- g_main_loop_unref(worker->loop);
-
- g_rec_mutex_lock(&worker->mutex);
- g_rec_mutex_unlock(&worker->mutex);
- g_rec_mutex_clear(&worker->mutex);
+ g_mutex_lock(&worker->mutex);
+ g_mutex_unlock(&worker->mutex);
+ g_mutex_clear(&worker->mutex);
free(worker->name);
free(worker);
static gpointer __worker_thread_loop(gpointer data)
{
struct aul_worker_s *worker = (struct aul_worker_s *)data;
- GMainContext *context;
__set_comm(worker->name);
- context = g_main_loop_get_context(worker->loop);
- g_main_context_push_thread_default(context);
+ g_mutex_lock(&worker->mutex);
+ worker->context = g_main_context_new();
+ if (!worker->context) {
+ _E("Failed to create GMainContext");
+ g_cond_signal(&worker->cond);
+ g_mutex_unlock(&worker->mutex);
+ return NULL;
+ }
- g_main_loop_run(worker->loop);
+ worker->loop = g_main_loop_new(worker->context, FALSE);
+ g_main_context_push_thread_default(worker->context);
+ g_cond_signal(&worker->cond);
+ g_mutex_unlock(&worker->mutex);
- g_main_context_pop_thread_default(context);
+ if (!worker->loop) {
+ _E("Failed to create GMainLoop");
+ goto end;
+ }
+
+ g_main_loop_run(worker->loop);
+end:
+ g_list_free_full(worker->jobs, __destroy_job);
+ g_main_context_pop_thread_default(worker->context);
+ g_main_loop_unref(worker->loop);
+ g_main_context_unref(worker->context);
return NULL;
}
aul_worker_h aul_worker_create(const char *name)
{
struct aul_worker_s *worker;
- GMainContext *context;
if (!name) {
_E("Invalid parameter");
return NULL;
}
- g_rec_mutex_init(&worker->mutex);
+ g_mutex_init(&worker->mutex);
+ g_cond_init(&worker->cond);
worker->name = strdup(name);
if (!worker->name) {
return NULL;
}
- context = g_main_context_new();
- if (!context) {
- _E("Failed to create GMainContext");
- aul_worker_destroy(worker);
- return NULL;
- }
-
- worker->loop = g_main_loop_new(context, FALSE);
- g_main_context_unref(context);
- if (!worker->loop) {
- _E("Failed to create GMainLoop");
- aul_worker_destroy(worker);
- return NULL;
- }
+ g_mutex_lock(&worker->mutex);
worker->thread = g_thread_new(name, __worker_thread_loop, worker);
if (!worker->thread) {
return NULL;
}
+ g_cond_wait(&worker->cond, &worker->mutex);
+ g_mutex_unlock(&worker->mutex);
+
return worker;
}