ecore: rewrite of Ecore_Thread internal to use Eina_Lock and ecore_main_loop_thread_s...
authorcedric <cedric@7cbeb6ba-43b4-40fd-8cce-4c39aea84d33>
Mon, 20 Feb 2012 15:57:18 +0000 (15:57 +0000)
committercedric <cedric@7cbeb6ba-43b4-40fd-8cce-4c39aea84d33>
Mon, 20 Feb 2012 15:57:18 +0000 (15:57 +0000)
NOTES: It is now safer and faster. I doubt I will have more time before the release to finish
ecore_thread_message_run, nor to make the shutdown nicer.

git-svn-id: svn+ssh://svn.enlightenment.org/var/svn/e/trunk/ecore@68164 7cbeb6ba-43b4-40fd-8cce-4c39aea84d33

ChangeLog
NEWS
src/lib/ecore/ecore_thread.c

index db8999c..070de1c 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
 
         * Improve callbacks in ecore_evas to use typedefs for readability.
 
+2012-02-20  Cedric Bail
+
+       * Rewrite internal of Ecore_Thread to use Eina_Lock and ecore_main_loop_thread_safe_call_async.
diff --git a/NEWS b/NEWS
index 05dc4f8..4217611 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -31,7 +31,9 @@ Improvements:
      - certificates can now be added for STARTTTLS
     * ecore_win32:
      - fix modifiers value on Windows XP
-
+    * ecore_thread:
+     - use eina_lock
+     - use Ecore thread safe async call
 
 Ecore 1.1.0
 
index 4444ad4..96b0b8a 100644 (file)
 
 #ifdef EFL_HAVE_THREADS
 
+# define LK(x) Eina_Lock x
+# define LKI(x) eina_lock_new(&(x))
+# define LKD(x) eina_lock_free(&(x))
+# define LKL(x) eina_lock_take(&(x))
+# define LKU(x) eina_lock_release(&(x))
+
+# define CD(x) Eina_Condition x
+# define CDI(x, m) eina_condition_new(&(x), &(m))
+# define CDD(x) eina_condition_free(&(x))
+# define CDB(x) eina_condition_broadcast(&(x))
+# define CDW(x, t) eina_condition_timedwait(&(x), t)
+
+# define LRWK(x) Eina_RWLock x
+# define LRWKI(x) eina_rwlock_new(&(x));
+# define LRWKD(x) eina_rwlock_free(&(x));
+# define LRWKWL(x) eina_rwlock_take_write(&(x));
+# define LRWKRL(x) eina_rwlock_take_read(&(x));
+# define LRWKU(x) eina_rwlock_release(&(x));
+
 # ifdef EFL_HAVE_POSIX_THREADS
 #  include <pthread.h>
 #  ifdef __linux__
 #  define PHE(x, y)    pthread_equal(x, y)
 #  define PHS()        pthread_self()
 #  define PHC(x, f, d) pthread_create(&(x), NULL, (void *)f, d)
-#  define PHJ(x, p)    pthread_join(x, (void **)(&(p)))
+#  define PHJ(x)       pthread_join(x, NULL)
 #  define PHA(x)       pthread_cancel(x)
 
-#  define CD(x)        pthread_cond_t x
-#  define CDI(x)       pthread_cond_init(&(x), NULL);
-#  define CDD(x)       pthread_cond_destroy(&(x));
-#  define CDB(x)       pthread_cond_broadcast(&(x));
-#  define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
-
-#  define LK(x)        pthread_mutex_t x
-#  define LKI(x)       pthread_mutex_init(&(x), NULL);
-#  define LKD(x)       pthread_mutex_destroy(&(x));
-#  define LKL(x)       pthread_mutex_lock(&(x));
-#  define LKU(x)       pthread_mutex_unlock(&(x));
-
-#  define LRWK(x)      pthread_rwlock_t x
-#  define LRWKI(x)     pthread_rwlock_init(&(x), NULL);
-#  define LRWKD(x)     pthread_rwlock_destroy(&(x));
-#  define LRWKWL(x)    pthread_rwlock_wrlock(&(x));
-#  define LRWKRL(x)    pthread_rwlock_rdlock(&(x));
-#  define LRWKU(x)     pthread_rwlock_unlock(&(x));
-
 # else /* EFL_HAVE_WIN32_THREADS */
 
 #  define WIN32_LEAN_AND_MEAN
@@ -108,209 +108,9 @@ _ecore_thread_win32_join(win32_thread *x,
    return 0;
 }
 
-#  define PHJ(x, p) _ecore_thread_win32_join(x, (void **)(&(p)))
+#  define PHJ(x) _ecore_thread_win32_join(x, NULL)
 #  define PHA(x)    TerminateThread(x->thread, 0)
 
-#  define LK(x)     HANDLE x
-#  define LKI(x)    x = CreateMutex(NULL, FALSE, NULL)
-#  define LKD(x)    CloseHandle(x)
-#  define LKL(x)    WaitForSingleObject(x, INFINITE)
-#  define LKU(x)    ReleaseMutex(x)
-
-typedef struct
-{
-   HANDLE           semaphore;
-   LONG             threads_count;
-   CRITICAL_SECTION threads_count_lock;
-} win32_cond;
-
-#  define CD(x) win32_cond * x
-
-#  define CDI(x)                                                       \
-  do {                                                                 \
-       x = (win32_cond *)calloc(1, sizeof(win32_cond));                \
-       if (x)                                                          \
-         {                                                             \
-            x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
-            if (x->semaphore)                                          \
-              InitializeCriticalSection(&x->threads_count_lock);       \
-            else                                                       \
-              {                                                        \
-                 free(x);                                              \
-                 x = NULL;                                             \
-              }                                                        \
-         }                                                             \
-    } while (0)
-
-#  define CDD(x)                  \
-  do {                            \
-       CloseHandle(x->semaphore); \
-       free(x);                   \
-       x = NULL;                  \
-    } while (0)
-
-#  define CDB(x)                                                  \
-  do {                                                            \
-       EnterCriticalSection(&x->threads_count_lock);              \
-       if (x->threads_count > 0)                                  \
-         ReleaseSemaphore(x->semaphore, x->threads_count, NULL);  \
-       LeaveCriticalSection (&x->threads_count_lock);             \
-    } while (0)
-
-int
-_ecore_thread_win32_cond_timedwait(win32_cond     *c,
-                                   HANDLE         *external_mutex,
-                                   struct timeval *t)
-{
-   DWORD res;
-   DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
-   LKL(external_mutex);
-   EnterCriticalSection (&c->threads_count_lock);
-   c->threads_count++;
-   LeaveCriticalSection (&c->threads_count_lock);
-   LKU(external_mutex);
-   res = WaitForSingleObject(c->semaphore, val);
-   if (res == WAIT_OBJECT_0)
-     return 0;
-   else
-     return -1;
-}
-
-#  define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
-
-typedef struct
-{
-   LONG readers_count;
-   LONG writers_count;
-   int  readers;
-   int  writers;
-   LK(mutex);
-   CD(cond_read);
-   CD(cond_write);
-} win32_rwl;
-
-#  define LRWK(x) win32_rwl * x
-#  define LRWKI(x)                                    \
-  do {                                                \
-       x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
-       if (x)                                         \
-         {                                            \
-            LKI(x->mutex);                            \
-            if (x->mutex)                             \
-              {                                       \
-                 CDI(x->cond_read);                   \
-                 if (x->cond_read)                    \
-                   {                                  \
-                      CDI(x->cond_write);             \
-                      if (!x->cond_write)             \
-                        {                             \
-                           CDD(x->cond_read);         \
-                           LKD(x->mutex);             \
-                           free(x);                   \
-                           x = NULL;                  \
-                        }                             \
-                   }                                  \
-                 else                                 \
-                   {                                  \
-                      LKD(x->mutex);                  \
-                      free(x);                        \
-                      x = NULL;                       \
-                   }                                  \
-              }                                       \
-            else                                      \
-              {                                       \
-                 free(x);                             \
-                 x = NULL;                            \
-              }                                       \
-         }                                            \
-    } while (0)
-
-#  define LRWKD(x)         \
-  do {                     \
-       LKU(x->mutex);      \
-       LKD(x->mutex);      \
-       CDD(x->cond_write); \
-       CDD(x->cond_read);  \
-       free(x);            \
-    } while (0)
-#  define LRWKWL(x)                                                             \
-  do {                                                                          \
-       DWORD res;                                                               \
-       LKU(x->mutex);                                                           \
-       if (x->writers || x->readers > 0)                                        \
-         {                                                                      \
-            x->writers_count++;                                                 \
-            while (x->writers || x->readers > 0)                                \
-              {                                                                 \
-                 EnterCriticalSection(&x->cond_write->threads_count_lock);      \
-                 x->cond_read->threads_count++;                                 \
-                 LeaveCriticalSection(&x->cond_write->threads_count_lock);      \
-                 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
-                 if (res != WAIT_OBJECT_0) break;                               \
-              }                                                                 \
-            x->writers_count--;                                                 \
-         }                                                                      \
-       if (res == 0) x->writers_count = 1;                                      \
-       LKU(x->mutex);                                                           \
-    } while (0)
-#  define LRWKRL(x)                                                             \
-  do {                                                                          \
-       DWORD res;                                                               \
-       LKL(x->mutex);                                                           \
-       if (x->writers)                                                          \
-         {                                                                      \
-            x->readers_count++;                                                 \
-            while (x->writers)                                                  \
-              {                                                                 \
-                 EnterCriticalSection(&x->cond_write->threads_count_lock);      \
-                 x->cond_read->threads_count++;                                 \
-                 LeaveCriticalSection(&x->cond_write->threads_count_lock);      \
-                 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
-                 if (res != WAIT_OBJECT_0) break;                               \
-              }                                                                 \
-            x->readers_count--;                                                 \
-         }                                                                      \
-       if (res == 0)                                                            \
-         x->readers++;                                                          \
-       LKU(x->mutex);                                                           \
-    } while (0)
-#  define LRWKU(x)                                                          \
-  do {                                                                      \
-       LKL(x->mutex);                                                       \
-       if (x->writers)                                                      \
-         {                                                                  \
-            x->writers = 0;                                                 \
-            if (x->readers_count == 1)                                      \
-              {                                                             \
-                 EnterCriticalSection(&x->cond_read->threads_count_lock);   \
-                 if (x->cond_read->threads_count > 0)                       \
-                   ReleaseSemaphore(x->cond_read->semaphore, 1, 0);         \
-                 LeaveCriticalSection(&x->cond_read->threads_count_lock);   \
-              }                                                             \
-            else if (x->readers_count > 0)                                  \
-              CDB(x->cond_read);                                            \
-            else if (x->writers_count > 0)                                  \
-              {                                                             \
-                 EnterCriticalSection (&x->cond_write->threads_count_lock); \
-                 if (x->cond_write->threads_count > 0)                      \
-                   ReleaseSemaphore(x->cond_write->semaphore, 1, 0);        \
-                 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
-              }                                                             \
-         }                                                                  \
-       else if (x->readers > 0)                                             \
-         {                                                                  \
-            x->readers--;                                                   \
-            if (x->readers == 0 && x->writers_count > 0)                    \
-              {                                                             \
-                 EnterCriticalSection (&x->cond_write->threads_count_lock); \
-                 if (x->cond_write->threads_count > 0)                      \
-                   ReleaseSemaphore(x->cond_write->semaphore, 1, 0);        \
-                 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
-              }                                                             \
-         }                                                                  \
-       LKU(x->mutex);                                                       \
-    } while (0)
-
 # endif
 
 #endif
@@ -336,14 +136,24 @@ struct _Ecore_Pthread_Worker
       {
          Ecore_Thread_Cb        func_heavy;
          Ecore_Thread_Notify_Cb func_notify;
-         Ecore_Pipe            *notify;
 
-         Ecore_Pipe            *direct_pipe;
          Ecore_Pthread_Worker  *direct_worker;
 
          int                    send;
          int                    received;
       } feedback_run;
+      struct {
+         Ecore_Thread_Cb func_main;
+         Ecore_Thread_Notify_Cb func_notify;
+
+         Ecore_Pipe            *send;
+         Ecore_Pthread_Worker  *direct_worker;
+
+         struct {
+            int send;
+            int received;
+         } from, to;
+      } message_run;
    } u;
 
    Ecore_Thread_Cb func_cancel;
@@ -357,47 +167,63 @@ struct _Ecore_Pthread_Worker
 
    const void     *data;
 
-   Eina_Bool       cancel : 1;
-   Eina_Bool       feedback_run : 1;
-   Eina_Bool       kill : 1;
-   Eina_Bool       reschedule : 1;
-   Eina_Bool       no_queue : 1;
+   volatile int cancel;
+
+#ifdef EFL_HAVE_THREADS
+   LK(cancel_mutex);
+#endif
+
+   Eina_Bool message_run : 1;
+   Eina_Bool feedback_run : 1;
+   Eina_Bool kill : 1;
+   Eina_Bool reschedule : 1;
+   Eina_Bool no_queue : 1;
 };
 
 #ifdef EFL_HAVE_THREADS
 typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
-
 struct _Ecore_Pthread_Data
 {
    Ecore_Pthread_Worker *death_job;
-   Ecore_Pipe           *p;
    void                 *data;
                          PH(thread);
 };
+
+typedef struct _Ecore_Pthread_Notify Ecore_Pthread_Notify;
+struct _Ecore_Pthread_Notify
+{
+   Ecore_Pthread_Worker *work;
+   const void *user_data;
+};
+
+typedef void *(*Ecore_Thread_Sync_Cb)(void* data, Ecore_Thread *thread);
+
+typedef struct _Ecore_Pthread_Message Ecore_Pthread_Message;
+struct _Ecore_Pthread_Message
+{
+   union {
+      Ecore_Thread_Cb async;
+      Ecore_Thread_Sync_Cb sync;
+   } u;
+
+   const void *data;
+
+   int code;
+
+   Eina_Bool callback : 1;
+   Eina_Bool sync : 1;
+};
+
 #endif
 
 static int _ecore_thread_count_max = 0;
-static int ECORE_THREAD_PIPE_DEL = 0;
-static Eina_Array *_ecore_thread_pipe = NULL;
 
 #ifdef EFL_HAVE_THREADS
 
-static void _ecore_thread_handler(void        *data __UNUSED__,
-                                  void        *buffer,
-                                  unsigned int nbyte);
-
-static Ecore_Pipe *
-_ecore_thread_pipe_get(void)
-{
-   if (eina_array_count(_ecore_thread_pipe) > 0)
-     return eina_array_pop(_ecore_thread_pipe);
-
-   return ecore_pipe_add(_ecore_thread_handler, NULL);
-}
+static void _ecore_thread_handler(void *data);
 
 static int _ecore_thread_count = 0;
 
-static Ecore_Event_Handler *del_handler = NULL;
 static Eina_List *_ecore_active_job_threads = NULL;
 static Eina_List *_ecore_pending_job_threads = NULL;
 static Eina_List *_ecore_pending_job_threads_feedback = NULL;
@@ -454,38 +280,15 @@ _ecore_thread_data_free(void *data)
 }
 
 static void
-_ecore_thread_pipe_free(void *data __UNUSED__,
-                        void *event)
-{
-   Ecore_Pipe *p = event;
-
-   if (eina_array_count(_ecore_thread_pipe) < 50)
-     eina_array_push(_ecore_thread_pipe, p);
-   else
-     ecore_pipe_del(p);
-   eina_threads_shutdown();
-}
-
-static Eina_Bool
-_ecore_thread_pipe_del(void *data __UNUSED__,
-                       int   type __UNUSED__,
-                       void *event __UNUSED__)
-{
-   /* This is a hack to delay pipe destruction until we are out of its internal loop. */
-    return ECORE_CALLBACK_CANCEL;
-}
-
-static void
 _ecore_thread_end(Ecore_Pthread_Data *pth,
                   Ecore_Thread       *work)
 {
    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)work;
-   Ecore_Pipe *p;
 
-   if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
+   if (!worker->message_run || !worker->feedback_run || (worker->feedback_run && !worker->no_queue))
      _ecore_thread_count--;
 
-   if (PHJ(pth->thread, p) != 0)
+   if (PHJ(pth->thread) != 0)
      return;
 
    if (eina_list_count(_ecore_pending_job_threads) > 0
@@ -496,7 +299,7 @@ _ecore_thread_end(Ecore_Pthread_Data *pth,
          INF("spawning threads because of still pending jobs.");
 
          pth->death_job = _ecore_thread_worker_new();
-         if (!pth->p || !pth->death_job) goto end;
+         if (!pth->death_job) goto end;
 
          eina_threads_init();
 
@@ -514,7 +317,6 @@ end:
 
    _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
 
-   ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
    free(pth);
 }
 
@@ -534,10 +336,6 @@ _ecore_thread_kill(Ecore_Pthread_Worker *work)
 
    if (work->feedback_run)
      {
-        ecore_pipe_del(work->u.feedback_run.notify);
-
-        if (work->u.feedback_run.direct_pipe)
-          eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
         if (work->u.feedback_run.direct_worker)
           _ecore_thread_worker_free(work->u.feedback_run.direct_worker);
      }
@@ -549,15 +347,9 @@ _ecore_thread_kill(Ecore_Pthread_Worker *work)
 }
 
 static void
-_ecore_thread_handler(void        *data __UNUSED__,
-                      void        *buffer,
-                      unsigned int nbyte)
+_ecore_thread_handler(void *data)
 {
-   Ecore_Pthread_Worker *work;
-
-   if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
-
-   work = *(Ecore_Pthread_Worker **)buffer;
+   Ecore_Pthread_Worker *work = data;
 
    if (work->feedback_run)
      {
@@ -572,16 +364,17 @@ _ecore_thread_handler(void        *data __UNUSED__,
 }
 
 static void
-_ecore_notify_handler(void        *data,
-                      void        *buffer,
-                      unsigned int nbyte)
+_ecore_nothing_handler(void *data __UNUSED__, void *buffer __UNUSED__, unsigned int nbyte __UNUSED__)
 {
-   Ecore_Pthread_Worker *work = data;
-   void *user_data;
+}
 
-   if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
+static void
+_ecore_notify_handler(void *data)
+{
+   Ecore_Pthread_Notify *notify = data;
+   Ecore_Pthread_Worker *work = notify->work;
+   void *user_data = (void*) notify->user_data;
 
-   user_data = *(void **)buffer;
    work->u.feedback_run.received++;
 
    if (work->u.feedback_run.func_notify)
@@ -595,13 +388,58 @@ _ecore_notify_handler(void        *data,
 }
 
 static void
-_ecore_short_job(Ecore_Pipe *end_pipe,
-                PH(thread))
+_ecore_message_notify_handler(void *data)
+{
+   Ecore_Pthread_Notify *notify = data;
+   Ecore_Pthread_Worker *work = notify->work;
+   Ecore_Pthread_Message *user_data = (void *) notify->user_data;
+   Eina_Bool delete = EINA_TRUE;
+
+   work->u.message_run.from.received++;
+
+   if (!user_data->callback)
+     {
+        if (work->u.message_run.func_notify)
+          work->u.message_run.func_notify((void *) work->data, (Ecore_Thread *) work, (void *) user_data->data);
+     }
+   else
+     {
+        if (user_data->sync)
+          {
+             user_data->data = user_data->u.sync((void*) user_data->data, (Ecore_Thread *) work);
+             user_data->callback = EINA_FALSE;
+             user_data->code = INT_MAX;
+             ecore_pipe_write(work->u.message_run.send, &user_data, sizeof (Ecore_Pthread_Message *));
+
+             delete = EINA_FALSE;
+          }
+        else
+          {
+             user_data->u.async((void*) user_data->data, (Ecore_Thread *) work);
+          }
+     }
+
+   if (delete)
+     {
+        free(user_data);
+     }
+
+   /* Force reading all notify event before killing the thread */
+   if (work->kill && work->u.message_run.from.send == work->u.message_run.from.received)
+     {
+        _ecore_thread_kill(work);
+     }
+}
+
+static void
+_ecore_short_job(PH(thread))
 {
    Ecore_Pthread_Worker *work;
 
    while (_ecore_pending_job_threads)
      {
+        int cancel;
+
         LKL(_ecore_pending_job_threads_mutex);
 
         if (!_ecore_pending_job_threads)
@@ -616,9 +454,12 @@ _ecore_short_job(Ecore_Pipe *end_pipe,
 
         LKU(_ecore_pending_job_threads_mutex);
 
+        LKL(work->cancel_mutex);
+        cancel = work->cancel;
+        LKU(work->cancel_mutex);
         work->self = thread;
-        if (!work->cancel)
-          work->u.short_run.func_blocking((void *)work->data, (Ecore_Thread *)work);
+        if (!cancel)
+          work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
 
         if (work->reschedule)
           {
@@ -630,19 +471,20 @@ _ecore_short_job(Ecore_Pipe *end_pipe,
           }
         else
           {
-             ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
+             ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
           }
      }
 }
 
 static void
-_ecore_feedback_job(Ecore_Pipe *end_pipe,
-                    PH(thread))
+_ecore_feedback_job(PH(thread))
 {
    Ecore_Pthread_Worker *work;
 
    while (_ecore_pending_job_threads_feedback)
      {
+        int cancel;
+
         LKL(_ecore_pending_job_threads_mutex);
 
         if (!_ecore_pending_job_threads_feedback)
@@ -657,9 +499,12 @@ _ecore_feedback_job(Ecore_Pipe *end_pipe,
 
         LKU(_ecore_pending_job_threads_mutex);
 
+        LKL(work->cancel_mutex);
+        cancel = work->cancel;
+        LKU(work->cancel_mutex);
         work->self = thread;
-        if (!work->cancel)
-          work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work);
+        if (!cancel)
+          work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
 
         if (work->reschedule)
           {
@@ -671,7 +516,7 @@ _ecore_feedback_job(Ecore_Pipe *end_pipe,
           }
         else
           {
-             ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
+             ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
           }
      }
 }
@@ -691,20 +536,20 @@ _ecore_direct_worker(Ecore_Pthread_Worker *work)
    pth = malloc(sizeof (Ecore_Pthread_Data));
    if (!pth) return NULL;
 
-   pth->p = work->u.feedback_run.direct_pipe;
-   if (!pth->p)
-     {
-        free(pth);
-        return NULL;
-     }
    pth->thread = PHS();
 
    work->self = pth->thread;
-   work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work);
+   if (work->message_run)
+     work->u.message_run.func_main((void *) work->data, (Ecore_Thread *) work);
+   else
+     work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
 
-   ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
+   ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
 
-   work = work->u.feedback_run.direct_worker;
+   if (work->message_run)
+     work = work->u.message_run.direct_worker;
+   else
+     work = work->u.feedback_run.direct_worker;
    if (!work)
      {
         free(pth);
@@ -717,14 +562,17 @@ _ecore_direct_worker(Ecore_Pthread_Worker *work)
    work->func_cancel = NULL;
    work->cancel = EINA_FALSE;
    work->feedback_run = EINA_FALSE;
+   work->message_run = EINA_FALSE;
+   work->no_queue = EINA_FALSE;
    work->kill = EINA_FALSE;
    work->hash = NULL;
-   CDI(work->cond);
+   LKI(work->cancel_mutex);
    LKI(work->mutex);
+   CDI(work->cond, work->mutex);
 
-   ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
+   ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
 
-   return pth->p;
+   return NULL;
 }
 
 static void *
@@ -740,8 +588,8 @@ _ecore_thread_worker(Ecore_Pthread_Data *pth)
    eina_sched_prio_drop();
 
 restart:
-   if (_ecore_pending_job_threads) _ecore_short_job(pth->p, pth->thread);
-   if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
+   if (_ecore_pending_job_threads) _ecore_short_job(pth->thread);
+   if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->thread);
 
    /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
 
@@ -777,14 +625,17 @@ restart:
    work->func_cancel = NULL;
    work->cancel = EINA_FALSE;
    work->feedback_run = EINA_FALSE;
+   work->message_run = EINA_FALSE;
    work->kill = EINA_FALSE;
+   work->no_queue = EINA_FALSE;
    work->hash = NULL;
-   CDI(work->cond);
+   LKI(work->cancel_mutex);
    LKI(work->mutex);
+   CDI(work->cond, work->mutex);
 
-   ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
+   ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work);
 
-   return pth->p;
+   return NULL;
 }
 
 #endif
@@ -813,16 +664,11 @@ _ecore_thread_init(void)
    if (_ecore_thread_count_max <= 0)
      _ecore_thread_count_max = 1;
 
-   ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
-   _ecore_thread_pipe = eina_array_new(8);
-
 #ifdef EFL_HAVE_THREADS
-   del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
-
    LKI(_ecore_pending_job_threads_mutex);
    LRWKI(_ecore_thread_global_hash_lock);
    LKI(_ecore_thread_global_hash_mutex);
-   CDI(_ecore_thread_global_hash_cond);
+   CDI(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex);
 #endif
 }
 
@@ -830,10 +676,6 @@ void
 _ecore_thread_shutdown(void)
 {
    /* FIXME: If function are still running in the background, should we kill them ? */
-    Ecore_Pipe *p;
-    Eina_Array_Iterator it;
-    unsigned int i;
-
 #ifdef EFL_HAVE_THREADS
     Ecore_Pthread_Worker *work;
     Ecore_Pthread_Data *pth;
@@ -843,46 +685,39 @@ _ecore_thread_shutdown(void)
     EINA_LIST_FREE(_ecore_pending_job_threads, work)
       {
          if (work->func_cancel)
-           work->func_cancel((void *)work->data, (Ecore_Thread *)work);
+           work->func_cancel((void *)work->data, (Ecore_Thread *) work);
          free(work);
       }
 
     EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
       {
          if (work->func_cancel)
-           work->func_cancel((void *)work->data, (Ecore_Thread *)work);
+           work->func_cancel((void *)work->data, (Ecore_Thread *) work);
          free(work);
       }
 
     LKU(_ecore_pending_job_threads_mutex);
 
-    /* Improve emergency shutdown */
+    /* FIXME: Improve emergency shutdown, now that we use async call, we can do something */
     EINA_LIST_FREE(_ecore_active_job_threads, pth)
       {
-         Ecore_Pipe *ep;
-
          PHA(pth->thread);
-         PHJ(pth->thread, ep);
-
-         ecore_pipe_del(pth->p);
+         PHJ(pth->thread);
       }
     if (_ecore_thread_global_hash)
       eina_hash_free(_ecore_thread_global_hash);
-    _ecore_event_handler_del(del_handler);
     have_main_loop_thread = 0;
-    del_handler = NULL;
+
+    while ((work = eina_trash_pop(&_ecore_thread_worker_trash)))
+      {
+         free(work);
+      }
 
     LKD(_ecore_pending_job_threads_mutex);
     LRWKD(_ecore_thread_global_hash_lock);
     LKD(_ecore_thread_global_hash_mutex);
     CDD(_ecore_thread_global_hash_cond);
 #endif
-
-    EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
-    ecore_pipe_del(p);
-
-    eina_array_free(_ecore_thread_pipe);
-    _ecore_thread_pipe = NULL;
 }
 
 void
@@ -927,15 +762,19 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking,
    work->func_cancel = func_cancel;
    work->cancel = EINA_FALSE;
    work->feedback_run = EINA_FALSE;
+   work->message_run = EINA_FALSE;
    work->kill = EINA_FALSE;
    work->reschedule = EINA_FALSE;
+   work->no_queue = EINA_FALSE;
    work->data = data;
 
 #ifdef EFL_HAVE_THREADS
+   LKI(work->cancel_mutex);
+
    work->self = 0;
    work->hash = NULL;
-   CDI(work->cond);
    LKI(work->mutex);
+   CDI(work->cond, work->mutex);
 
    LKL(_ecore_pending_job_threads_mutex);
    _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
@@ -952,9 +791,8 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking,
    pth = malloc(sizeof (Ecore_Pthread_Data));
    if (!pth) goto on_error;
 
-   pth->p = _ecore_thread_pipe_get();
    pth->death_job = _ecore_thread_worker_new();
-   if (!pth->p || !pth->death_job) goto on_error;
+   if (!pth->death_job) goto on_error;
 
    eina_threads_init();
 
@@ -969,7 +807,6 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking,
 on_error:
    if (pth)
      {
-        if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
         free(pth);
      }
@@ -981,7 +818,11 @@ on_error:
         LKU(_ecore_pending_job_threads_mutex);
 
         if (work->func_cancel)
-          work->func_cancel((void *)work->data, (Ecore_Thread *)work);
+          work->func_cancel((void *) work->data, (Ecore_Thread *) work);
+
+        CDD(work->cond);
+        LKD(work->mutex);
+        LKD(work->cancel_mutex);
         free(work);
         work = NULL;
      }
@@ -1013,12 +854,16 @@ EAPI Eina_Bool
 ecore_thread_cancel(Ecore_Thread *thread)
 {
 #ifdef EFL_HAVE_THREADS
-   Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
+   Ecore_Pthread_Worker *volatile work = (Ecore_Pthread_Worker *)thread;
    Eina_List *l;
+   int cancel;
 
    if (!work)
      return EINA_TRUE;
-   if (work->cancel)
+   LKL(work->cancel_mutex);
+   cancel = work->cancel;
+   LKU(work->cancel_mutex);
+   if (cancel)
      return EINA_FALSE;
 
    if (work->feedback_run)
@@ -1070,9 +915,14 @@ ecore_thread_cancel(Ecore_Thread *thread)
 
    LKU(_ecore_pending_job_threads_mutex);
 
+   work = (Ecore_Pthread_Worker *)thread;
+
    /* Delay the destruction */
-on_exit:
-   ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
+ on_exit:
+   LKL(work->cancel_mutex);
+   work->cancel = EINA_TRUE;
+   LKU(work->cancel_mutex);
+
    return EINA_FALSE;
 #else
    (void) thread;
@@ -1083,10 +933,23 @@ on_exit:
 EAPI Eina_Bool
 ecore_thread_check(Ecore_Thread *thread)
 {
-   Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
+   Ecore_Pthread_Worker *volatile worker = (Ecore_Pthread_Worker *) thread;
+   int cancel;
 
    if (!worker) return EINA_TRUE;
-   return worker->cancel;
+#ifdef EFL_HAVE_THREADS
+   LKL(worker->cancel_mutex);
+#endif
+   cancel = worker->cancel;
+   /* FIXME: there is an insane bug driving me nuts here. I don't know if
+    it's a race condition, some cache issue or some alien attack on our software.
+    But ecore_thread_check will only work correctly with a printf, all the volatile,
+    lock and even usleep don't help here... */
+   /* fprintf(stderr, "wc: %i\n", cancel); */
+#ifdef EFL_HAVE_THREADS
+   LKU(worker->cancel_mutex);
+#endif
+   return cancel;
 }
 
 EAPI Ecore_Thread *
@@ -1109,12 +972,14 @@ ecore_thread_feedback_run(Ecore_Thread_Cb        func_heavy,
    worker->u.feedback_run.func_heavy = func_heavy;
    worker->u.feedback_run.func_notify = func_notify;
    worker->hash = NULL;
-   CDI(worker->cond);
    LKI(worker->mutex);
+   CDI(worker->cond, worker->mutex);
    worker->func_cancel = func_cancel;
    worker->func_end = func_end;
    worker->data = data;
+   LKI(worker->cancel_mutex);
    worker->cancel = EINA_FALSE;
+   worker->message_run = EINA_FALSE;
    worker->feedback_run = EINA_TRUE;
    worker->kill = EINA_FALSE;
    worker->reschedule = EINA_FALSE;
@@ -1123,15 +988,12 @@ ecore_thread_feedback_run(Ecore_Thread_Cb        func_heavy,
    worker->u.feedback_run.send = 0;
    worker->u.feedback_run.received = 0;
 
-   worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
-   worker->u.feedback_run.direct_pipe = NULL;
    worker->u.feedback_run.direct_worker = NULL;
 
    if (!try_no_queue)
      {
         PH(t);
 
-        worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
         worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
         worker->no_queue = EINA_TRUE;
 
@@ -1140,6 +1002,12 @@ ecore_thread_feedback_run(Ecore_Thread_Cb        func_heavy,
         if (PHC(t, _ecore_direct_worker, worker) == 0)
           return (Ecore_Thread *)worker;
 
+        if (worker->u.feedback_run.direct_worker)
+          {
+             _ecore_thread_worker_free(worker->u.feedback_run.direct_worker);
+             worker->u.feedback_run.direct_worker = NULL;
+          }
+
         eina_threads_shutdown();
      }
 
@@ -1160,9 +1028,8 @@ ecore_thread_feedback_run(Ecore_Thread_Cb        func_heavy,
    pth = malloc(sizeof (Ecore_Pthread_Data));
    if (!pth) goto on_error;
 
-   pth->p = _ecore_thread_pipe_get();
    pth->death_job = _ecore_thread_worker_new();
-   if (!pth->p || !pth->death_job) goto on_error;
+   if (!pth->death_job) goto on_error;
 
    eina_threads_init();
 
@@ -1177,7 +1044,6 @@ ecore_thread_feedback_run(Ecore_Thread_Cb        func_heavy,
 on_error:
    if (pth)
      {
-        if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
         free(pth);
      }
@@ -1193,7 +1059,8 @@ on_error:
 
         if (worker)
           {
-             ecore_pipe_del(worker->u.feedback_run.notify);
+             CDD(worker->cond);
+             LKD(worker->mutex);
              free(worker);
              worker = NULL;
           }
@@ -1219,6 +1086,7 @@ on_error:
    worker.data = data;
    worker.cancel = EINA_FALSE;
    worker.feedback_run = EINA_TRUE;
+   worker.message_run = EINA_FALSE;
    worker.kill = EINA_FALSE;
 
    do {
@@ -1241,13 +1109,48 @@ ecore_thread_feedback(Ecore_Thread *thread,
    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
 
    if (!worker) return EINA_FALSE;
-   if (!worker->feedback_run) return EINA_FALSE;
 
 #ifdef EFL_HAVE_THREADS
    if (!PHE(worker->self, PHS())) return EINA_FALSE;
 
-   worker->u.feedback_run.send++;
-   ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
+   if (worker->feedback_run)
+     {
+        Ecore_Pthread_Notify *notify;
+
+        notify = malloc(sizeof (Ecore_Pthread_Notify));
+        if (!notify) return EINA_FALSE;
+
+        notify->user_data = data;
+        notify->work = worker;
+        worker->u.feedback_run.send++;
+
+        ecore_main_loop_thread_safe_call_async(_ecore_notify_handler, notify);
+     }
+   else if (worker->message_run)
+     {
+        Ecore_Pthread_Message *msg;
+        Ecore_Pthread_Notify *notify;
+
+        msg = malloc(sizeof (Ecore_Pthread_Message*));
+        if (!msg) return EINA_FALSE;
+        msg->data = data;
+        msg->callback = EINA_FALSE;
+        msg->sync = EINA_FALSE;
+
+        notify = malloc(sizeof (Ecore_Pthread_Notify));
+        if (!notify)
+          {
+             free(msg);
+             return EINA_FALSE;
+          }
+        notify->work = worker;
+        notify->user_data = msg;
+
+        worker->u.message_run.from.send++;
+        ecore_main_loop_thread_safe_call_async(_ecore_message_notify_handler, notify);
+     }
+   else
+     return EINA_FALSE;
 
    return EINA_TRUE;
 #else
@@ -1257,6 +1160,74 @@ ecore_thread_feedback(Ecore_Thread *thread,
 #endif
 }
 
+#if 0
+EAPI Ecore_Thread *
+ecore_thread_message_run(Ecore_Thread_Cb func_main,
+                        Ecore_Thread_Notify_Cb func_notify,
+                        Ecore_Thread_Cb func_end,
+                        Ecore_Thread_Cb func_cancel,
+                        const void *data)
+{
+#ifdef EFL_HAVE_THREADS
+  Ecore_Pthread_Worker *worker;
+  PH(t);
+
+  if (!func_main) return NULL;
+
+  worker = _ecore_thread_worker_new();
+  if (!worker) return NULL;
+
+  worker->u.message_run.func_main = func_main;
+  worker->u.message_run.func_notify = func_notify;
+  worker->u.message_run.direct_worker = _ecore_thread_worker_new();
+  worker->u.message_run.send = ecore_pipe_add(_ecore_nothing_handler, worker);
+  worker->u.message_run.from.send = 0;
+  worker->u.message_run.from.received = 0;
+  worker->u.message_run.to.send = 0;
+  worker->u.message_run.to.received = 0;
+
+  ecore_pipe_freeze(worker->u.message_run.send);
+
+  worker->func_cancel = func_cancel;
+  worker->func_end = func_end;
+  worker->hash = NULL;
+  LKI(worker->mutex);
+  CDI(worker->cond, worker->mutex);
+  worker->data = data;
+
+  LKI(worker->cancel_mutex);
+  worker->cancel = EINA_FALSE;
+  worker->message_run = EINA_TRUE;
+  worker->feedback_run = EINA_FALSE;
+  worker->kill = EINA_FALSE;
+  worker->reschedule = EINA_FALSE;
+  worker->no_queue = EINA_FALSE;
+  worker->self = 0;
+
+  eina_threads_init();
+
+  if (PHC(t, _ecore_direct_worker, worker) == 0)
+    return (Ecore_Thread*) worker;
+
+  eina_threads_shutdown();
+
+  if (worker->u.message_run.direct_worker) _ecore_thread_worker_free(worker->u.message_run.direct_worker);
+  if (worker->u.message_run.send) ecore_pipe_del(worker->u.message_run.send);
+
+  CDD(worker->cond);
+  LKD(worker->mutex);
+#else
+  /* Note: This type of thread can't and never will work without thread support */
+  WRN("ecore_thread_message_run called, but threads disable in Ecore, things will go wrong. Starting now !");
+# warning "You disabled threads support in ecore, I hope you know what you are doing !"
+#endif
+
+  func_cancel((void *) data, NULL);
+
+  return NULL;
+}
+#endif
+
 EAPI Eina_Bool
 ecore_thread_reschedule(Ecore_Thread *thread)
 {
@@ -1641,24 +1612,13 @@ ecore_thread_global_data_wait(const char *key,
 
    while (1)
      {
-#ifndef _WIN32
-        struct timespec t = { 0, 0 };
-
-        t.tv_sec = (long int)tm;
-        t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
-#else
-        struct timeval t = { 0, 0 };
-
-        t.tv_sec = (long int)tm;
-        t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
-#endif
         LRWKRL(_ecore_thread_global_hash_lock);
         ret = eina_hash_find(_ecore_thread_global_hash, key);
         LRWKU(_ecore_thread_global_hash_lock);
         if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
           break;
         LKL(_ecore_thread_global_hash_mutex);
-        CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
+        CDW(_ecore_thread_global_hash_cond, tm);
         LKU(_ecore_thread_global_hash_mutex);
      }
    if (ret) return ret->data;