#include "Ecore.h"
#include "ecore_private.h"
-#ifdef EFL_HAVE_PTHREAD
typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
-typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
typedef struct _Ecore_Pthread Ecore_Pthread;
struct _Ecore_Pthread_Worker
{
- void (*func_heavy)(void *data);
- void (*func_end)(void *data);
+ union {
+ struct {
+ void (*func_heavy)(void *data);
+ } short_run;
+ struct {
+ void (*func_heavy)(Ecore_Thread *thread, void *data);
+ void (*func_notify)(Ecore_Thread *thread, void *data);
+
+ Ecore_Pipe *notify;
+
+#ifdef EFL_HAVE_PTHREAD
+ pthread_t self;
+#endif
+ } long_run;
+ } u;
+
void (*func_cancel)(void *data);
+ void (*func_end)(void *data);
const void *data;
Eina_Bool cancel : 1;
+ Eina_Bool long_run : 1;
};
+#ifdef EFL_HAVE_PTHREAD
+typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
+
struct _Ecore_Pthread_Data
{
Ecore_Pipe *p;
+ void *data;
pthread_t thread;
};
#endif
#ifdef EFL_HAVE_PTHREAD
static int _ecore_thread_count = 0;
-static Eina_List *_ecore_thread_data = NULL;
static Eina_List *_ecore_thread = NULL;
+static Eina_List *_ecore_thread_data = NULL;
+static Eina_List *_ecore_long_thread_data = NULL;
static Ecore_Event_Handler *del_handler = NULL;
static pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;
ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
}
-static void *
-_ecore_thread_worker(Ecore_Pthread_Data *pth)
+static void
+_ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
{
Ecore_Pthread_Worker *work;
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+ if (nbyte != sizeof (Ecore_Pthread_Worker*)) return ;
- pthread_mutex_lock(&_mutex);
- _ecore_thread_count++;
- pthread_mutex_unlock(&_mutex);
+ work = *(Ecore_Pthread_Worker**)buffer;
- on_error:
+ if (work->cancel)
+ {
+ if (work->func_cancel)
+ work->func_cancel((void*) work->data);
+ }
+ else
+ {
+ if (work->func_end)
+ work->func_end((void*) work->data);
+ }
+
+ if (work->long_run) ecore_pipe_del(work->u.long_run.notify);
+ free(work);
+}
+
+static void
+_ecore_notify_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
+{
+ Ecore_Pthread_Worker *work;
+
+ if (nbyte != sizeof (Ecore_Pthread_Worker*)) return ;
+
+ work = *(Ecore_Pthread_Worker**)buffer;
+
+ if (work->u.long_run.func_notify)
+ work->u.long_run.func_notify((Ecore_Thread *) work, (void*) work->data);
+}
+
+static void
+_ecore_short_job(Ecore_Pipe *end_pipe)
+{
+ Ecore_Pthread_Worker *work;
while (_ecore_thread_data)
{
pthread_mutex_unlock(&_mutex);
- work->func_heavy((void*) work->data);
+ work->u.short_run.func_heavy((void*) work->data);
- ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*));
+ ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker*));
}
+}
- pthread_mutex_lock(&_mutex);
- if (_ecore_thread_data)
+static void
+_ecore_long_job(Ecore_Pipe *end_pipe, pthread_t thread)
+{
+ Ecore_Pthread_Worker *work;
+
+ while (_ecore_long_thread_data)
{
+ pthread_mutex_lock(&_mutex);
+
+ if (!_ecore_long_thread_data)
+ {
+ pthread_mutex_unlock(&_mutex);
+ break;
+ }
+
+ work = eina_list_data_get(_ecore_long_thread_data);
+ _ecore_long_thread_data = eina_list_remove_list(_ecore_long_thread_data, _ecore_long_thread_data);
+
pthread_mutex_unlock(&_mutex);
- goto on_error;
+
+ work->u.long_run.self = thread;
+ work->u.long_run.func_heavy((Ecore_Thread *) work, (void*) work->data);
+
+ ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker*));
}
- _ecore_thread_count--;
+}
- pthread_mutex_unlock(&_mutex);
+static void *
+_ecore_direct_worker(Ecore_Pthread_Worker *work)
+{
+ Ecore_Pthread_Data *pth;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+
+ pth = malloc(sizeof (Ecore_Pthread_Data));
+ if (!pth) return NULL;
+
+ pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
+ if (!pth->p)
+ {
+ free(pth);
+ return NULL;
+ }
+ pth->thread = pthread_self();
+
+ work->u.long_run.self = pth->thread;
+ work->u.long_run.func_heavy((Ecore_Thread *) work, (void*) work->data);
+
+ ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*));
work = malloc(sizeof (Ecore_Pthread_Worker));
- if (!work) return NULL;
+ if (!work)
+ {
+ ecore_pipe_del(pth->p);
+ free(pth);
+ return NULL;
+ }
work->data = pth;
- work->func_heavy = NULL;
+ work->u.short_run.func_heavy = NULL;
work->func_end = (void*) _ecore_thread_end;
work->func_cancel = NULL;
work->cancel = EINA_FALSE;
+ work->long_run = EINA_FALSE;
ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*));
return pth->p;
}
-static void
-_ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
+static void *
+_ecore_thread_worker(Ecore_Pthread_Data *pth)
{
Ecore_Pthread_Worker *work;
- if (nbyte != sizeof (Ecore_Pthread_Worker*)) return ;
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
- work = *(Ecore_Pthread_Worker**)buffer;
+ pthread_mutex_lock(&_mutex);
+ _ecore_thread_count++;
+ pthread_mutex_unlock(&_mutex);
- if (work->cancel)
+ on_error:
+ if (_ecore_thread_data) _ecore_short_job(pth->p);
+ if (_ecore_long_thread_data) _ecore_long_job(pth->p, pth->thread);
+
+ /* FIXME: Check if there is long running task todo, and switch to long run handler. */
+
+ pthread_mutex_lock(&_mutex);
+ if (_ecore_thread_data)
{
- if (work->func_cancel)
- work->func_cancel((void*) work->data);
+ pthread_mutex_unlock(&_mutex);
+ goto on_error;
}
- else
+ if (_ecore_long_thread_data)
{
- work->func_end((void*) work->data);
+ pthread_mutex_unlock(&_mutex);
+ goto on_error;
}
- free(work);
+ _ecore_thread_count--;
+
+ pthread_mutex_unlock(&_mutex);
+
+ work = malloc(sizeof (Ecore_Pthread_Worker));
+ if (!work) return NULL;
+
+ work->data = pth;
+ work->u.short_run.func_heavy = NULL;
+ work->func_end = (void*) _ecore_thread_end;
+ work->func_cancel = NULL;
+ work->cancel = EINA_FALSE;
+ work->long_run = EINA_FALSE;
+
+ ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*));
+
+ return pth->p;
}
+
#endif
void
{
#ifdef EFL_HAVE_PTHREAD
Ecore_Pthread_Worker *work;
- Ecore_Pthread_Data *pth;
+ Ecore_Pthread_Data *pth = NULL;
+
+ if (!func_heavy) return NULL;
work = malloc(sizeof (Ecore_Pthread_Worker));
if (!work)
return NULL;
}
- work->func_heavy = func_heavy;
+ work->u.short_run.func_heavy = func_heavy;
work->func_end = func_end;
work->func_cancel = func_cancel;
work->cancel = EINA_FALSE;
+ work->long_run = EINA_FALSE;
work->data = data;
pthread_mutex_lock(&_mutex);
/* One more thread could be created. */
pth = malloc(sizeof (Ecore_Pthread_Data));
- if (!pth)
- goto on_error;
+ if (!pth) goto on_error;
pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
+ if (!pth->p) goto on_error;
if (pthread_create(&pth->thread, NULL, (void*) _ecore_thread_worker, pth) == 0)
return (Ecore_Thread*) work;
on_error:
+ if (pth)
+ {
+ if (pth->p) ecore_pipe_del(pth->p);
+ free(pth);
+ }
+
if (_ecore_thread_count == 0)
{
if (work->func_cancel)
work->func_cancel((void*) work->data);
free(work);
+ work = NULL;
}
- return NULL;
+ return (Ecore_Thread*) work;
#else
/*
If no thread and as we don't want to break app that rely on this
return EINA_TRUE;
#endif
}
+
+EAPI Eina_Bool
+ecore_thread_check(Ecore_Thread *thread)
+{
+ Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread;
+
+ if (!worker) return EINA_FALSE;
+ return worker->cancel;
+}
+
+EAPI Ecore_Thread *
+ecore_long_run(void (*func_heavy)(Ecore_Thread *thread, void *data),
+ void (*func_notify)(Ecore_Thread *thread, void *data),
+ void (*func_end)(void *data),
+ void (*func_cancel)(void *data),
+ const void *data,
+ Eina_Bool try_no_queue)
+{
+
+#ifdef EFL_HAVE_PTHREAD
+ Ecore_Pthread_Worker *worker;
+ Ecore_Pthread_Data *pth = NULL;
+
+ if (!func_heavy) return NULL;
+
+ worker = malloc(sizeof (Ecore_Pthread_Worker));
+ if (!worker) goto on_error;
+
+ worker->u.long_run.func_heavy = func_heavy;
+ worker->u.long_run.func_notify = func_notify;
+ worker->func_cancel = func_cancel;
+ worker->func_end = func_end;
+ worker->data = data;
+ worker->cancel = EINA_FALSE;
+ worker->long_run = EINA_TRUE;
+
+ worker->u.long_run.notify = ecore_pipe_add(_ecore_notify_handler, NULL);
+
+ if (!try_no_queue)
+ {
+ pthread_t t;
+
+ if (pthread_create(&t, NULL, (void*) _ecore_direct_worker, worker) == 0)
+ return (Ecore_Thread*) worker;
+ }
+
+ pthread_mutex_lock(&_mutex);
+ _ecore_long_thread_data = eina_list_append(_ecore_long_thread_data, worker);
+
+ if (_ecore_thread_count == _ecore_thread_count_max)
+ {
+ pthread_mutex_unlock(&_mutex);
+ return (Ecore_Thread*) worker;
+ }
+
+ pthread_mutex_unlock(&_mutex);
+
+ /* One more thread could be created. */
+ pth = malloc(sizeof (Ecore_Pthread_Data));
+ if (!pth) goto on_error;
+
+ pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
+ if (pth->p) goto on_error;
+
+ if (pthread_create(&pth->thread, NULL, (void*) _ecore_thread_worker, pth) == 0)
+ return (Ecore_Thread*) worker;
+
+ on_error:
+ if (pth)
+ {
+ if (pth->p) ecore_pipe_del(pth->p);
+ free(pth);
+ }
+
+ if (_ecore_thread_count == 0)
+ {
+ if (func_cancel) func_cancel((void*) data);
+
+ if (worker)
+ {
+ ecore_pipe_del(worker->u.long_run.notify);
+ free(worker);
+ worker = NULL;
+ }
+ }
+
+ return (Ecore_Thread*) worker;
+#else
+ Ecore_Pthread_Worker worker;
+
+ /*
+ If no thread and as we don't want to break app that rely on this
+ facility, we will lock the interface until we are done.
+ */
+ worker.u.long_run.func_heavy = func_heavy;
+ worker.u.long_run.func_notify = func_notify;
+ worker.u.long_run.notify = NULL;
+ worker.func_cancel = func_cancel;
+ worker.func_end = func_end;
+ worker.data = data;
+ worker.cancel = EINA_FALSE;
+ worker.long_run = EINA_TRUE;
+
+ func_heavy((Ecore_Thread*) &worker, data);
+
+ if (worker.cancel) func_cancel(data);
+ else func_end(data);
+
+ return NULL;
+#endif
+}
+
+EAPI Eina_Bool
+ecore_thread_notify(Ecore_Thread *thread, void *data)
+{
+ Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread;
+
+ if (!worker) return EINA_FALSE;
+ if (!worker->long_run) return EINA_FALSE;
+
+#ifdef EFL_HAVE_PTHREAD
+ if (worker->u.long_run.self != pthread_self()) return EINA_FALSE;
+
+ ecore_pipe_write(worker->u.long_run.notify, data, sizeof (void*));
+
+ return EINA_TRUE;
+#else
+ worker->u.long_run.func_notify(thread, data);
+
+ return EINA_TRUE;
+#endif
+}
+