#ifdef EFL_HAVE_THREADS
+# define LK(x) Eina_Lock x
+# define LKI(x) eina_lock_new(&(x))
+# define LKD(x) eina_lock_free(&(x))
+# define LKL(x) eina_lock_take(&(x))
+# define LKU(x) eina_lock_release(&(x))
+
+# define CD(x) Eina_Condition x
+# define CDI(x, m) eina_condition_new(&(x), &(m))
+# define CDD(x) eina_condition_free(&(x))
+# define CDB(x) eina_condition_broadcast(&(x))
+# define CDW(x, t) eina_condition_timedwait(&(x), t)
+
+# define LRWK(x) Eina_RWLock x
+# define LRWKI(x) eina_rwlock_new(&(x));
+# define LRWKD(x) eina_rwlock_free(&(x));
+# define LRWKWL(x) eina_rwlock_take_write(&(x));
+# define LRWKRL(x) eina_rwlock_take_read(&(x));
+# define LRWKU(x) eina_rwlock_release(&(x));
+
# ifdef EFL_HAVE_POSIX_THREADS
# include <pthread.h>
# ifdef __linux__
# define PHE(x, y) pthread_equal(x, y)
# define PHS() pthread_self()
# define PHC(x, f, d) pthread_create(&(x), NULL, (void *)f, d)
-# define PHJ(x, p) pthread_join(x, (void **)(&(p)))
+# define PHJ(x) pthread_join(x, NULL)
# define PHA(x) pthread_cancel(x)
-# define CD(x) pthread_cond_t x
-# define CDI(x) pthread_cond_init(&(x), NULL);
-# define CDD(x) pthread_cond_destroy(&(x));
-# define CDB(x) pthread_cond_broadcast(&(x));
-# define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
-
-# define LK(x) pthread_mutex_t x
-# define LKI(x) pthread_mutex_init(&(x), NULL);
-# define LKD(x) pthread_mutex_destroy(&(x));
-# define LKL(x) pthread_mutex_lock(&(x));
-# define LKU(x) pthread_mutex_unlock(&(x));
-
-# define LRWK(x) pthread_rwlock_t x
-# define LRWKI(x) pthread_rwlock_init(&(x), NULL);
-# define LRWKD(x) pthread_rwlock_destroy(&(x));
-# define LRWKWL(x) pthread_rwlock_wrlock(&(x));
-# define LRWKRL(x) pthread_rwlock_rdlock(&(x));
-# define LRWKU(x) pthread_rwlock_unlock(&(x));
-
# else /* EFL_HAVE_WIN32_THREADS */
# define WIN32_LEAN_AND_MEAN
return 0;
}
-# define PHJ(x, p) _ecore_thread_win32_join(x, (void **)(&(p)))
+# define PHJ(x) _ecore_thread_win32_join(x, NULL)
# define PHA(x) TerminateThread(x->thread, 0)
-# define LK(x) HANDLE x
-# define LKI(x) x = CreateMutex(NULL, FALSE, NULL)
-# define LKD(x) CloseHandle(x)
-# define LKL(x) WaitForSingleObject(x, INFINITE)
-# define LKU(x) ReleaseMutex(x)
-
-typedef struct
-{
- HANDLE semaphore;
- LONG threads_count;
- CRITICAL_SECTION threads_count_lock;
-} win32_cond;
-
-# define CD(x) win32_cond * x
-
-# define CDI(x) \
- do { \
- x = (win32_cond *)calloc(1, sizeof(win32_cond)); \
- if (x) \
- { \
- x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
- if (x->semaphore) \
- InitializeCriticalSection(&x->threads_count_lock); \
- else \
- { \
- free(x); \
- x = NULL; \
- } \
- } \
- } while (0)
-
-# define CDD(x) \
- do { \
- CloseHandle(x->semaphore); \
- free(x); \
- x = NULL; \
- } while (0)
-
-# define CDB(x) \
- do { \
- EnterCriticalSection(&x->threads_count_lock); \
- if (x->threads_count > 0) \
- ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \
- LeaveCriticalSection (&x->threads_count_lock); \
- } while (0)
-
-int
-_ecore_thread_win32_cond_timedwait(win32_cond *c,
- HANDLE *external_mutex,
- struct timeval *t)
-{
- DWORD res;
- DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
- LKL(external_mutex);
- EnterCriticalSection (&c->threads_count_lock);
- c->threads_count++;
- LeaveCriticalSection (&c->threads_count_lock);
- LKU(external_mutex);
- res = WaitForSingleObject(c->semaphore, val);
- if (res == WAIT_OBJECT_0)
- return 0;
- else
- return -1;
-}
-
-# define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
-
-typedef struct
-{
- LONG readers_count;
- LONG writers_count;
- int readers;
- int writers;
- LK(mutex);
- CD(cond_read);
- CD(cond_write);
-} win32_rwl;
-
-# define LRWK(x) win32_rwl * x
-# define LRWKI(x) \
- do { \
- x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
- if (x) \
- { \
- LKI(x->mutex); \
- if (x->mutex) \
- { \
- CDI(x->cond_read); \
- if (x->cond_read) \
- { \
- CDI(x->cond_write); \
- if (!x->cond_write) \
- { \
- CDD(x->cond_read); \
- LKD(x->mutex); \
- free(x); \
- x = NULL; \
- } \
- } \
- else \
- { \
- LKD(x->mutex); \
- free(x); \
- x = NULL; \
- } \
- } \
- else \
- { \
- free(x); \
- x = NULL; \
- } \
- } \
- } while (0)
-
-# define LRWKD(x) \
- do { \
- LKU(x->mutex); \
- LKD(x->mutex); \
- CDD(x->cond_write); \
- CDD(x->cond_read); \
- free(x); \
- } while (0)
-# define LRWKWL(x) \
- do { \
- DWORD res; \
- LKU(x->mutex); \
- if (x->writers || x->readers > 0) \
- { \
- x->writers_count++; \
- while (x->writers || x->readers > 0) \
- { \
- EnterCriticalSection(&x->cond_write->threads_count_lock); \
- x->cond_read->threads_count++; \
- LeaveCriticalSection(&x->cond_write->threads_count_lock); \
- res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
- if (res != WAIT_OBJECT_0) break; \
- } \
- x->writers_count--; \
- } \
- if (res == 0) x->writers_count = 1; \
- LKU(x->mutex); \
- } while (0)
-# define LRWKRL(x) \
- do { \
- DWORD res; \
- LKL(x->mutex); \
- if (x->writers) \
- { \
- x->readers_count++; \
- while (x->writers) \
- { \
- EnterCriticalSection(&x->cond_write->threads_count_lock); \
- x->cond_read->threads_count++; \
- LeaveCriticalSection(&x->cond_write->threads_count_lock); \
- res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
- if (res != WAIT_OBJECT_0) break; \
- } \
- x->readers_count--; \
- } \
- if (res == 0) \
- x->readers++; \
- LKU(x->mutex); \
- } while (0)
-# define LRWKU(x) \
- do { \
- LKL(x->mutex); \
- if (x->writers) \
- { \
- x->writers = 0; \
- if (x->readers_count == 1) \
- { \
- EnterCriticalSection(&x->cond_read->threads_count_lock); \
- if (x->cond_read->threads_count > 0) \
- ReleaseSemaphore(x->cond_read->semaphore, 1, 0); \
- LeaveCriticalSection(&x->cond_read->threads_count_lock); \
- } \
- else if (x->readers_count > 0) \
- CDB(x->cond_read); \
- else if (x->writers_count > 0) \
- { \
- EnterCriticalSection (&x->cond_write->threads_count_lock); \
- if (x->cond_write->threads_count > 0) \
- ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
- LeaveCriticalSection (&x->cond_write->threads_count_lock); \
- } \
- } \
- else if (x->readers > 0) \
- { \
- x->readers--; \
- if (x->readers == 0 && x->writers_count > 0) \
- { \
- EnterCriticalSection (&x->cond_write->threads_count_lock); \
- if (x->cond_write->threads_count > 0) \
- ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
- LeaveCriticalSection (&x->cond_write->threads_count_lock); \
- } \
- } \
- LKU(x->mutex); \
- } while (0)
-
# endif
#endif
{
Ecore_Thread_Cb func_heavy;
Ecore_Thread_Notify_Cb func_notify;
- Ecore_Pipe *notify;
- Ecore_Pipe *direct_pipe;
Ecore_Pthread_Worker *direct_worker;
int send;
int received;
} feedback_run;
+ struct {
+ Ecore_Thread_Cb func_main;
+ Ecore_Thread_Notify_Cb func_notify;
+
+ Ecore_Pipe *send;
+ Ecore_Pthread_Worker *direct_worker;
+
+ struct {
+ int send;
+ int received;
+ } from, to;
+ } message_run;
} u;
Ecore_Thread_Cb func_cancel;
const void *data;
- Eina_Bool cancel : 1;
- Eina_Bool feedback_run : 1;
- Eina_Bool kill : 1;
- Eina_Bool reschedule : 1;
- Eina_Bool no_queue : 1;
+ volatile int cancel;
+
+#ifdef EFL_HAVE_THREADS
+ LK(cancel_mutex);
+#endif
+
+ Eina_Bool message_run : 1;
+ Eina_Bool feedback_run : 1;
+ Eina_Bool kill : 1;
+ Eina_Bool reschedule : 1;
+ Eina_Bool no_queue : 1;
};
#ifdef EFL_HAVE_THREADS
typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
-
struct _Ecore_Pthread_Data
{
Ecore_Pthread_Worker *death_job;
- Ecore_Pipe *p;
void *data;
PH(thread);
};
+
+typedef struct _Ecore_Pthread_Notify Ecore_Pthread_Notify;
+struct _Ecore_Pthread_Notify
+{
+ Ecore_Pthread_Worker *work;
+ const void *user_data;
+};
+
+typedef void *(*Ecore_Thread_Sync_Cb)(void* data, Ecore_Thread *thread);
+
+typedef struct _Ecore_Pthread_Message Ecore_Pthread_Message;
+struct _Ecore_Pthread_Message
+{
+ union {
+ Ecore_Thread_Cb async;
+ Ecore_Thread_Sync_Cb sync;
+ } u;
+
+ const void *data;
+
+ int code;
+
+ Eina_Bool callback : 1;
+ Eina_Bool sync : 1;
+};
+
#endif
static int _ecore_thread_count_max = 0;
-static int ECORE_THREAD_PIPE_DEL = 0;
-static Eina_Array *_ecore_thread_pipe = NULL;
#ifdef EFL_HAVE_THREADS
-static void _ecore_thread_handler(void *data __UNUSED__,
- void *buffer,
- unsigned int nbyte);
-
-static Ecore_Pipe *
-_ecore_thread_pipe_get(void)
-{
- if (eina_array_count(_ecore_thread_pipe) > 0)
- return eina_array_pop(_ecore_thread_pipe);
-
- return ecore_pipe_add(_ecore_thread_handler, NULL);
-}
+static void _ecore_thread_handler(void *data);
static int _ecore_thread_count = 0;
-static Ecore_Event_Handler *del_handler = NULL;
static Eina_List *_ecore_active_job_threads = NULL;
static Eina_List *_ecore_pending_job_threads = NULL;
static Eina_List *_ecore_pending_job_threads_feedback = NULL;
}
static void
-_ecore_thread_pipe_free(void *data __UNUSED__,
- void *event)
-{
- Ecore_Pipe *p = event;
-
- if (eina_array_count(_ecore_thread_pipe) < 50)
- eina_array_push(_ecore_thread_pipe, p);
- else
- ecore_pipe_del(p);
- eina_threads_shutdown();
-}
-
-static Eina_Bool
-_ecore_thread_pipe_del(void *data __UNUSED__,
- int type __UNUSED__,
- void *event __UNUSED__)
-{
- /* This is a hack to delay pipe destruction until we are out of its internal loop. */
- return ECORE_CALLBACK_CANCEL;
-}
-
-static void
_ecore_thread_end(Ecore_Pthread_Data *pth,
Ecore_Thread *work)
{
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)work;
- Ecore_Pipe *p;
- if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
+ if (!worker->message_run || !worker->feedback_run || (worker->feedback_run && !worker->no_queue))
_ecore_thread_count--;
- if (PHJ(pth->thread, p) != 0)
+ if (PHJ(pth->thread) != 0)
return;
if (eina_list_count(_ecore_pending_job_threads) > 0
INF("spawning threads because of still pending jobs.");
pth->death_job = _ecore_thread_worker_new();
- if (!pth->p || !pth->death_job) goto end;
+ if (!pth->death_job) goto end;
eina_threads_init();
_ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
- ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
free(pth);
}
if (work->feedback_run)
{
- ecore_pipe_del(work->u.feedback_run.notify);
-
- if (work->u.feedback_run.direct_pipe)
- eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
if (work->u.feedback_run.direct_worker)
_ecore_thread_worker_free(work->u.feedback_run.direct_worker);
}
}
static void
-_ecore_thread_handler(void *data __UNUSED__,
- void *buffer,
- unsigned int nbyte)
+_ecore_thread_handler(void *data)
{
- Ecore_Pthread_Worker *work;
-
- if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
-
- work = *(Ecore_Pthread_Worker **)buffer;
+ Ecore_Pthread_Worker *work = data;
if (work->feedback_run)
{
}
static void
-_ecore_notify_handler(void *data,
- void *buffer,
- unsigned int nbyte)
+_ecore_nothing_handler(void *data __UNUSED__, void *buffer __UNUSED__, unsigned int nbyte __UNUSED__)
{
- Ecore_Pthread_Worker *work = data;
- void *user_data;
+}
- if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
+static void
+_ecore_notify_handler(void *data)
+{
+ Ecore_Pthread_Notify *notify = data;
+ Ecore_Pthread_Worker *work = notify->work;
+ void *user_data = (void*) notify->user_data;
- user_data = *(void **)buffer;
work->u.feedback_run.received++;
if (work->u.feedback_run.func_notify)
}
static void
-_ecore_short_job(Ecore_Pipe *end_pipe,
- PH(thread))
+_ecore_message_notify_handler(void *data)
+{
+ Ecore_Pthread_Notify *notify = data;
+ Ecore_Pthread_Worker *work = notify->work;
+ Ecore_Pthread_Message *user_data = (void *) notify->user_data;
+ Eina_Bool delete = EINA_TRUE;
+
+ work->u.message_run.from.received++;
+
+ if (!user_data->callback)
+ {
+ if (work->u.message_run.func_notify)
+ work->u.message_run.func_notify((void *) work->data, (Ecore_Thread *) work, (void *) user_data->data);
+ }
+ else
+ {
+ if (user_data->sync)
+ {
+ user_data->data = user_data->u.sync((void*) user_data->data, (Ecore_Thread *) work);
+ user_data->callback = EINA_FALSE;
+ user_data->code = INT_MAX;
+ ecore_pipe_write(work->u.message_run.send, &user_data, sizeof (Ecore_Pthread_Message *));
+
+ delete = EINA_FALSE;
+ }
+ else
+ {
+ user_data->u.async((void*) user_data->data, (Ecore_Thread *) work);
+ }
+ }
+
+ if (delete)
+ {
+ free(user_data);
+ }
+
+ /* Force reading all notify event before killing the thread */
+ if (work->kill && work->u.message_run.from.send == work->u.message_run.from.received)
+ {
+ _ecore_thread_kill(work);
+ }
+}
+
+static void
+_ecore_short_job(PH(thread))
{
Ecore_Pthread_Worker *work;
while (_ecore_pending_job_threads)
{
+ int cancel;
+
LKL(_ecore_pending_job_threads_mutex);
if (!_ecore_pending_job_threads)
LKU(_ecore_pending_job_threads_mutex);
+ LKL(work->cancel_mutex);
+ cancel = work->cancel;
+ LKU(work->cancel_mutex);
work->self = thread;
- if (!work->cancel)
- work->u.short_run.func_blocking((void *)work->data, (Ecore_Thread *)work);
+ if (!cancel)
+ work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
if (work->reschedule)
{
}
else
{
- ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
+ ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
}
}
}
static void
-_ecore_feedback_job(Ecore_Pipe *end_pipe,
- PH(thread))
+_ecore_feedback_job(PH(thread))
{
Ecore_Pthread_Worker *work;
while (_ecore_pending_job_threads_feedback)
{
+ int cancel;
+
LKL(_ecore_pending_job_threads_mutex);
if (!_ecore_pending_job_threads_feedback)
LKU(_ecore_pending_job_threads_mutex);
+ LKL(work->cancel_mutex);
+ cancel = work->cancel;
+ LKU(work->cancel_mutex);
work->self = thread;
- if (!work->cancel)
- work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work);
+ if (!cancel)
+ work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
if (work->reschedule)
{
}
else
{
- ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
+ ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
}
}
}
pth = malloc(sizeof (Ecore_Pthread_Data));
if (!pth) return NULL;
- pth->p = work->u.feedback_run.direct_pipe;
- if (!pth->p)
- {
- free(pth);
- return NULL;
- }
pth->thread = PHS();
work->self = pth->thread;
- work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work);
+ if (work->message_run)
+ work->u.message_run.func_main((void *) work->data, (Ecore_Thread *) work);
+ else
+ work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
- ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
+ ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
- work = work->u.feedback_run.direct_worker;
+ if (work->message_run)
+ work = work->u.message_run.direct_worker;
+ else
+ work = work->u.feedback_run.direct_worker;
if (!work)
{
free(pth);
work->func_cancel = NULL;
work->cancel = EINA_FALSE;
work->feedback_run = EINA_FALSE;
+ work->message_run = EINA_FALSE;
+ work->no_queue = EINA_FALSE;
work->kill = EINA_FALSE;
work->hash = NULL;
- CDI(work->cond);
+ LKI(work->cancel_mutex);
LKI(work->mutex);
+ CDI(work->cond, work->mutex);
- ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
+ ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
- return pth->p;
+ return NULL;
}
static void *
eina_sched_prio_drop();
restart:
- if (_ecore_pending_job_threads) _ecore_short_job(pth->p, pth->thread);
- if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
+ if (_ecore_pending_job_threads) _ecore_short_job(pth->thread);
+ if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->thread);
/* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
work->func_cancel = NULL;
work->cancel = EINA_FALSE;
work->feedback_run = EINA_FALSE;
+ work->message_run = EINA_FALSE;
work->kill = EINA_FALSE;
+ work->no_queue = EINA_FALSE;
work->hash = NULL;
- CDI(work->cond);
+ LKI(work->cancel_mutex);
LKI(work->mutex);
+ CDI(work->cond, work->mutex);
- ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
+ ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
- return pth->p;
+ return NULL;
}
#endif
if (_ecore_thread_count_max <= 0)
_ecore_thread_count_max = 1;
- ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
- _ecore_thread_pipe = eina_array_new(8);
-
#ifdef EFL_HAVE_THREADS
- del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
-
LKI(_ecore_pending_job_threads_mutex);
LRWKI(_ecore_thread_global_hash_lock);
LKI(_ecore_thread_global_hash_mutex);
- CDI(_ecore_thread_global_hash_cond);
+ CDI(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex);
#endif
}
_ecore_thread_shutdown(void)
{
/* FIXME: If function are still running in the background, should we kill them ? */
- Ecore_Pipe *p;
- Eina_Array_Iterator it;
- unsigned int i;
-
#ifdef EFL_HAVE_THREADS
Ecore_Pthread_Worker *work;
Ecore_Pthread_Data *pth;
EINA_LIST_FREE(_ecore_pending_job_threads, work)
{
if (work->func_cancel)
- work->func_cancel((void *)work->data, (Ecore_Thread *)work);
+ work->func_cancel((void *)work->data, (Ecore_Thread *) work);
free(work);
}
EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
{
if (work->func_cancel)
- work->func_cancel((void *)work->data, (Ecore_Thread *)work);
+ work->func_cancel((void *)work->data, (Ecore_Thread *) work);
free(work);
}
LKU(_ecore_pending_job_threads_mutex);
- /* Improve emergency shutdown */
+ /* FIXME: Improve emergency shutdown, now that we use async call, we can do something */
EINA_LIST_FREE(_ecore_active_job_threads, pth)
{
- Ecore_Pipe *ep;
-
PHA(pth->thread);
- PHJ(pth->thread, ep);
-
- ecore_pipe_del(pth->p);
+ PHJ(pth->thread);
}
if (_ecore_thread_global_hash)
eina_hash_free(_ecore_thread_global_hash);
- _ecore_event_handler_del(del_handler);
have_main_loop_thread = 0;
- del_handler = NULL;
+
+ while ((work = eina_trash_pop(&_ecore_thread_worker_trash)))
+ {
+ free(work);
+ }
LKD(_ecore_pending_job_threads_mutex);
LRWKD(_ecore_thread_global_hash_lock);
LKD(_ecore_thread_global_hash_mutex);
CDD(_ecore_thread_global_hash_cond);
#endif
-
- EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
- ecore_pipe_del(p);
-
- eina_array_free(_ecore_thread_pipe);
- _ecore_thread_pipe = NULL;
}
void
work->func_cancel = func_cancel;
work->cancel = EINA_FALSE;
work->feedback_run = EINA_FALSE;
+ work->message_run = EINA_FALSE;
work->kill = EINA_FALSE;
work->reschedule = EINA_FALSE;
+ work->no_queue = EINA_FALSE;
work->data = data;
#ifdef EFL_HAVE_THREADS
+ LKI(work->cancel_mutex);
+
work->self = 0;
work->hash = NULL;
- CDI(work->cond);
LKI(work->mutex);
+ CDI(work->cond, work->mutex);
LKL(_ecore_pending_job_threads_mutex);
_ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
pth = malloc(sizeof (Ecore_Pthread_Data));
if (!pth) goto on_error;
- pth->p = _ecore_thread_pipe_get();
pth->death_job = _ecore_thread_worker_new();
- if (!pth->p || !pth->death_job) goto on_error;
+ if (!pth->death_job) goto on_error;
eina_threads_init();
on_error:
if (pth)
{
- if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
free(pth);
}
LKU(_ecore_pending_job_threads_mutex);
if (work->func_cancel)
- work->func_cancel((void *)work->data, (Ecore_Thread *)work);
+ work->func_cancel((void *) work->data, (Ecore_Thread *) work);
+
+ CDD(work->cond);
+ LKD(work->mutex);
+ LKD(work->cancel_mutex);
free(work);
work = NULL;
}
ecore_thread_cancel(Ecore_Thread *thread)
{
#ifdef EFL_HAVE_THREADS
- Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
+ Ecore_Pthread_Worker *volatile work = (Ecore_Pthread_Worker *)thread;
Eina_List *l;
+ int cancel;
if (!work)
return EINA_TRUE;
- if (work->cancel)
+ LKL(work->cancel_mutex);
+ cancel = work->cancel;
+ LKU(work->cancel_mutex);
+ if (cancel)
return EINA_FALSE;
if (work->feedback_run)
LKU(_ecore_pending_job_threads_mutex);
+ work = (Ecore_Pthread_Worker *)thread;
+
/* Delay the destruction */
-on_exit:
- ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
+ on_exit:
+ LKL(work->cancel_mutex);
+ work->cancel = EINA_TRUE;
+ LKU(work->cancel_mutex);
+
return EINA_FALSE;
#else
(void) thread;
EAPI Eina_Bool
ecore_thread_check(Ecore_Thread *thread)
{
- Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
+ Ecore_Pthread_Worker *volatile worker = (Ecore_Pthread_Worker *) thread;
+ int cancel;
if (!worker) return EINA_TRUE;
- return worker->cancel;
+#ifdef EFL_HAVE_THREADS
+ LKL(worker->cancel_mutex);
+#endif
+ cancel = worker->cancel;
+ /* FIXME: there is an insane bug driving me nuts here. I don't know if
+ it's a race condition, some cache issue or some alien attack on our software.
+ But ecore_thread_check will only work correctly with a printf, all the volatile,
+ lock and even usleep don't help here... */
+ /* fprintf(stderr, "wc: %i\n", cancel); */
+#ifdef EFL_HAVE_THREADS
+ LKU(worker->cancel_mutex);
+#endif
+ return cancel;
}
EAPI Ecore_Thread *
worker->u.feedback_run.func_heavy = func_heavy;
worker->u.feedback_run.func_notify = func_notify;
worker->hash = NULL;
- CDI(worker->cond);
LKI(worker->mutex);
+ CDI(worker->cond, worker->mutex);
worker->func_cancel = func_cancel;
worker->func_end = func_end;
worker->data = data;
+ LKI(worker->cancel_mutex);
worker->cancel = EINA_FALSE;
+ worker->message_run = EINA_FALSE;
worker->feedback_run = EINA_TRUE;
worker->kill = EINA_FALSE;
worker->reschedule = EINA_FALSE;
worker->u.feedback_run.send = 0;
worker->u.feedback_run.received = 0;
- worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
- worker->u.feedback_run.direct_pipe = NULL;
worker->u.feedback_run.direct_worker = NULL;
if (!try_no_queue)
{
PH(t);
- worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
worker->no_queue = EINA_TRUE;
if (PHC(t, _ecore_direct_worker, worker) == 0)
return (Ecore_Thread *)worker;
+ if (worker->u.feedback_run.direct_worker)
+ {
+ _ecore_thread_worker_free(worker->u.feedback_run.direct_worker);
+ worker->u.feedback_run.direct_worker = NULL;
+ }
+
eina_threads_shutdown();
}
pth = malloc(sizeof (Ecore_Pthread_Data));
if (!pth) goto on_error;
- pth->p = _ecore_thread_pipe_get();
pth->death_job = _ecore_thread_worker_new();
- if (!pth->p || !pth->death_job) goto on_error;
+ if (!pth->death_job) goto on_error;
eina_threads_init();
on_error:
if (pth)
{
- if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
free(pth);
}
if (worker)
{
- ecore_pipe_del(worker->u.feedback_run.notify);
+ CDD(worker->cond);
+ LKD(worker->mutex);
free(worker);
worker = NULL;
}
worker.data = data;
worker.cancel = EINA_FALSE;
worker.feedback_run = EINA_TRUE;
+ worker.message_run = EINA_FALSE;
worker.kill = EINA_FALSE;
do {
Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
if (!worker) return EINA_FALSE;
- if (!worker->feedback_run) return EINA_FALSE;
#ifdef EFL_HAVE_THREADS
if (!PHE(worker->self, PHS())) return EINA_FALSE;
- worker->u.feedback_run.send++;
- ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
+ if (worker->feedback_run)
+ {
+ Ecore_Pthread_Notify *notify;
+
+ notify = malloc(sizeof (Ecore_Pthread_Notify));
+ if (!notify) return EINA_FALSE;
+
+ notify->user_data = data;
+ notify->work = worker;
+ worker->u.feedback_run.send++;
+
+ ecore_main_loop_thread_safe_call_async(_ecore_notify_handler, notify);
+ }
+ else if (worker->message_run)
+ {
+ Ecore_Pthread_Message *msg;
+ Ecore_Pthread_Notify *notify;
+
+ msg = malloc(sizeof (Ecore_Pthread_Message*));
+ if (!msg) return EINA_FALSE;
+ msg->data = data;
+ msg->callback = EINA_FALSE;
+ msg->sync = EINA_FALSE;
+
+ notify = malloc(sizeof (Ecore_Pthread_Notify));
+ if (!notify)
+ {
+ free(msg);
+ return EINA_FALSE;
+ }
+ notify->work = worker;
+ notify->user_data = msg;
+
+ worker->u.message_run.from.send++;
+ ecore_main_loop_thread_safe_call_async(_ecore_message_notify_handler, notify);
+ }
+ else
+ return EINA_FALSE;
return EINA_TRUE;
#else
#endif
}
+#if 0
+EAPI Ecore_Thread *
+ecore_thread_message_run(Ecore_Thread_Cb func_main,
+ Ecore_Thread_Notify_Cb func_notify,
+ Ecore_Thread_Cb func_end,
+ Ecore_Thread_Cb func_cancel,
+ const void *data)
+{
+#ifdef EFL_HAVE_THREADS
+ Ecore_Pthread_Worker *worker;
+ PH(t);
+
+ if (!func_main) return NULL;
+
+ worker = _ecore_thread_worker_new();
+ if (!worker) return NULL;
+
+ worker->u.message_run.func_main = func_main;
+ worker->u.message_run.func_notify = func_notify;
+ worker->u.message_run.direct_worker = _ecore_thread_worker_new();
+ worker->u.message_run.send = ecore_pipe_add(_ecore_nothing_handler, worker);
+ worker->u.message_run.from.send = 0;
+ worker->u.message_run.from.received = 0;
+ worker->u.message_run.to.send = 0;
+ worker->u.message_run.to.received = 0;
+
+ ecore_pipe_freeze(worker->u.message_run.send);
+
+ worker->func_cancel = func_cancel;
+ worker->func_end = func_end;
+ worker->hash = NULL;
+ LKI(worker->mutex);
+ CDI(worker->cond, worker->mutex);
+ worker->data = data;
+
+ LKI(worker->cancel_mutex);
+ worker->cancel = EINA_FALSE;
+ worker->message_run = EINA_TRUE;
+ worker->feedback_run = EINA_FALSE;
+ worker->kill = EINA_FALSE;
+ worker->reschedule = EINA_FALSE;
+ worker->no_queue = EINA_FALSE;
+ worker->self = 0;
+
+ eina_threads_init();
+
+ if (PHC(t, _ecore_direct_worker, worker) == 0)
+ return (Ecore_Thread*) worker;
+
+ eina_threads_shutdown();
+
+ if (worker->u.message_run.direct_worker) _ecore_thread_worker_free(worker->u.message_run.direct_worker);
+ if (worker->u.message_run.send) ecore_pipe_del(worker->u.message_run.send);
+
+ CDD(worker->cond);
+ LKD(worker->mutex);
+#else
+ /* Note: This type of thread can't and never will work without thread support */
+ WRN("ecore_thread_message_run called, but threads disable in Ecore, things will go wrong. Starting now !");
+# warning "You disabled threads support in ecore, I hope you know what you are doing !"
+#endif
+
+ func_cancel((void *) data, NULL);
+
+ return NULL;
+}
+#endif
+
EAPI Eina_Bool
ecore_thread_reschedule(Ecore_Thread *thread)
{
while (1)
{
-#ifndef _WIN32
- struct timespec t = { 0, 0 };
-
- t.tv_sec = (long int)tm;
- t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
-#else
- struct timeval t = { 0, 0 };
-
- t.tv_sec = (long int)tm;
- t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
-#endif
LRWKRL(_ecore_thread_global_hash_lock);
ret = eina_hash_find(_ecore_thread_global_hash, key);
LRWKU(_ecore_thread_global_hash_lock);
if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
break;
LKL(_ecore_thread_global_hash_mutex);
- CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
+ CDW(_ecore_thread_global_hash_cond, tm);
LKU(_ecore_thread_global_hash_mutex);
}
if (ret) return ret->data;