* ecore: add ecore_long_run facility with notify to main loop.
authorcedric <cedric@7cbeb6ba-43b4-40fd-8cce-4c39aea84d33>
Wed, 30 Jun 2010 13:25:28 +0000 (13:25 +0000)
committercedric <cedric@7cbeb6ba-43b4-40fd-8cce-4c39aea84d33>
Wed, 30 Jun 2010 13:25:28 +0000 (13:25 +0000)
git-svn-id: svn+ssh://svn.enlightenment.org/var/svn/e/trunk/ecore@49948 7cbeb6ba-43b4-40fd-8cce-4c39aea84d33

src/lib/ecore/Ecore.h
src/lib/ecore/ecore_thread.c

index 610b1af..4e514b9 100644 (file)
@@ -336,7 +336,15 @@ extern "C" {
    EAPI void         ecore_pipe_read_close(Ecore_Pipe *p);
 
    EAPI Ecore_Thread *ecore_thread_run(void (*func_heavy)(void *data), void (*func_end)(void *data), void (*func_cancel)(void *data), const void *data);
+   EAPI Ecore_Thread *ecore_long_run(void (*func_heavy)(Ecore_Thread *thread, void *data),
+                                    void (*func_notify)(Ecore_Thread *thread, void *data),
+                                    void (*func_end)(void *data),
+                                    void (*func_cancel)(void *data),
+                                    const void *data,
+                                    Eina_Bool try_no_queue);
    EAPI Eina_Bool     ecore_thread_cancel(Ecore_Thread *thread);
+   EAPI Eina_Bool     ecore_thread_check(Ecore_Thread *thread);
+   EAPI Eina_Bool     ecore_thread_notify(Ecore_Thread *thread, void *data);
 
    EAPI double ecore_time_get(void);
    EAPI double ecore_loop_time_get(void);
index 561d0cc..8a9229d 100644 (file)
 #include "Ecore.h"
 #include "ecore_private.h"
 
-#ifdef EFL_HAVE_PTHREAD
 typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
-typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
 typedef struct _Ecore_Pthread Ecore_Pthread;
 
 struct _Ecore_Pthread_Worker
 {
-   void (*func_heavy)(void *data);
-   void (*func_end)(void *data);
+   union {
+      struct {
+        void (*func_heavy)(void *data);
+      } short_run;
+      struct {
+        void (*func_heavy)(Ecore_Thread *thread, void *data);
+        void (*func_notify)(Ecore_Thread *thread, void *data);
+
+        Ecore_Pipe *notify;
+
+#ifdef EFL_HAVE_PTHREAD
+        pthread_t self;
+#endif
+      } long_run;
+   } u;
+
    void (*func_cancel)(void *data);
+   void (*func_end)(void *data);
 
    const void *data;
 
    Eina_Bool cancel : 1;
+   Eina_Bool long_run : 1;
 };
 
+#ifdef EFL_HAVE_PTHREAD
+typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
+
 struct _Ecore_Pthread_Data
 {
    Ecore_Pipe *p;
+   void *data;
    pthread_t thread;
 };
 #endif
@@ -41,8 +59,9 @@ static int ECORE_THREAD_PIPE_DEL = 0;
 
 #ifdef EFL_HAVE_PTHREAD
 static int _ecore_thread_count = 0;
-static Eina_List *_ecore_thread_data = NULL;
 static Eina_List *_ecore_thread = NULL;
+static Eina_List *_ecore_thread_data = NULL;
+static Eina_List *_ecore_long_thread_data = NULL;
 static Ecore_Event_Handler *del_handler = NULL;
 
 static pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -75,19 +94,47 @@ _ecore_thread_end(Ecore_Pthread_Data *pth)
    ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
 }
 
-static void *
-_ecore_thread_worker(Ecore_Pthread_Data *pth)
+static void
+_ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
 {
    Ecore_Pthread_Worker *work;
 
-   pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
-   pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+   if (nbyte != sizeof (Ecore_Pthread_Worker*)) return ;
 
-   pthread_mutex_lock(&_mutex);
-   _ecore_thread_count++;
-   pthread_mutex_unlock(&_mutex);
+   work = *(Ecore_Pthread_Worker**)buffer;
 
- on_error:
+   if (work->cancel)
+     {
+       if (work->func_cancel)
+         work->func_cancel((void*) work->data);
+     }
+   else
+     {
+       if (work->func_end)
+         work->func_end((void*) work->data);
+     }
+
+   if (work->long_run) ecore_pipe_del(work->u.long_run.notify);
+   free(work);
+}
+
+static void
+_ecore_notify_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
+{
+   Ecore_Pthread_Worker *work;
+
+   if (nbyte != sizeof (Ecore_Pthread_Worker*)) return ;
+
+   work = *(Ecore_Pthread_Worker**)buffer;
+
+   if (work->u.long_run.func_notify)
+     work->u.long_run.func_notify((Ecore_Thread *) work, (void*) work->data);
+}
+
+static void
+_ecore_short_job(Ecore_Pipe *end_pipe)
+{
+   Ecore_Pthread_Worker *work;
 
    while (_ecore_thread_data)
      {
@@ -104,56 +151,132 @@ _ecore_thread_worker(Ecore_Pthread_Data *pth)
 
        pthread_mutex_unlock(&_mutex);
 
-       work->func_heavy((void*) work->data);
+       work->u.short_run.func_heavy((void*) work->data);
 
-       ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*));
+       ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker*));
      }
+}
 
-   pthread_mutex_lock(&_mutex);
-   if (_ecore_thread_data)
+static void
+_ecore_long_job(Ecore_Pipe *end_pipe, pthread_t thread)
+{
+   Ecore_Pthread_Worker *work;
+
+   while (_ecore_long_thread_data)
      {
+       pthread_mutex_lock(&_mutex);
+
+       if (!_ecore_long_thread_data)
+         {
+            pthread_mutex_unlock(&_mutex);
+            break;
+         }
+
+       work = eina_list_data_get(_ecore_long_thread_data);
+       _ecore_long_thread_data = eina_list_remove_list(_ecore_long_thread_data, _ecore_long_thread_data);
+
        pthread_mutex_unlock(&_mutex);
-       goto on_error;
+
+       work->u.long_run.self = thread;
+       work->u.long_run.func_heavy((Ecore_Thread *) work, (void*) work->data);
+
+       ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker*));
      }
-   _ecore_thread_count--;
+}
 
-   pthread_mutex_unlock(&_mutex);
+static void *
+_ecore_direct_worker(Ecore_Pthread_Worker *work)
+{
+   Ecore_Pthread_Data *pth;
+
+   pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+   pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
+
+   pth = malloc(sizeof (Ecore_Pthread_Data));
+   if (!pth) return NULL;
+
+   pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
+   if (!pth->p)
+     {
+       free(pth);
+       return NULL;
+     }
+   pth->thread = pthread_self();
+
+   work->u.long_run.self = pth->thread;
+   work->u.long_run.func_heavy((Ecore_Thread *) work, (void*) work->data);
+
+   ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*));
 
    work = malloc(sizeof (Ecore_Pthread_Worker));
-   if (!work) return NULL;
+   if (!work)
+     {
+       ecore_pipe_del(pth->p);
+       free(pth);
+       return NULL;
+     }
 
    work->data = pth;
-   work->func_heavy = NULL;
+   work->u.short_run.func_heavy = NULL;
    work->func_end = (void*) _ecore_thread_end;
    work->func_cancel = NULL;
    work->cancel = EINA_FALSE;
+   work->long_run = EINA_FALSE;
 
    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*));
 
    return pth->p;
 }
 
-static void
-_ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
+static void *
+_ecore_thread_worker(Ecore_Pthread_Data *pth)
 {
    Ecore_Pthread_Worker *work;
 
-   if (nbyte != sizeof (Ecore_Pthread_Worker*)) return ;
+   pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+   pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
 
-   work = *(Ecore_Pthread_Worker**)buffer;
+   pthread_mutex_lock(&_mutex);
+   _ecore_thread_count++;
+   pthread_mutex_unlock(&_mutex);
 
-   if (work->cancel)
+ on_error:
+   if (_ecore_thread_data) _ecore_short_job(pth->p);
+   if (_ecore_long_thread_data) _ecore_long_job(pth->p, pth->thread);
+
+   /* FIXME: Check if there is long running task todo, and switch to long run handler. */
+
+   pthread_mutex_lock(&_mutex);
+   if (_ecore_thread_data)
      {
-       if (work->func_cancel)
-         work->func_cancel((void*) work->data);
+       pthread_mutex_unlock(&_mutex);
+       goto on_error;
      }
-   else
+   if (_ecore_long_thread_data)
      {
-       work->func_end((void*) work->data);
+       pthread_mutex_unlock(&_mutex);
+       goto on_error;
      }
 
-   free(work);
+   _ecore_thread_count--;
+
+   pthread_mutex_unlock(&_mutex);
+
+   work = malloc(sizeof (Ecore_Pthread_Worker));
+   if (!work) return NULL;
+
+   work->data = pth;
+   work->u.short_run.func_heavy = NULL;
+   work->func_end = (void*) _ecore_thread_end;
+   work->func_cancel = NULL;
+   work->cancel = EINA_FALSE;
+   work->long_run = EINA_FALSE;
+
+   ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker*));
+
+   return pth->p;
 }
+
 #endif
 
 void
@@ -223,7 +346,9 @@ ecore_thread_run(void (*func_heavy)(void *data),
 {
 #ifdef EFL_HAVE_PTHREAD
    Ecore_Pthread_Worker *work;
-   Ecore_Pthread_Data *pth;
+   Ecore_Pthread_Data *pth = NULL;
+
+   if (!func_heavy) return NULL;
 
    work = malloc(sizeof (Ecore_Pthread_Worker));
    if (!work)
@@ -232,10 +357,11 @@ ecore_thread_run(void (*func_heavy)(void *data),
        return NULL;
      }
 
-   work->func_heavy = func_heavy;
+   work->u.short_run.func_heavy = func_heavy;
    work->func_end = func_end;
    work->func_cancel = func_cancel;
    work->cancel = EINA_FALSE;
+   work->long_run = EINA_FALSE;
    work->data = data;
 
    pthread_mutex_lock(&_mutex);
@@ -251,22 +377,29 @@ ecore_thread_run(void (*func_heavy)(void *data),
 
    /* One more thread could be created. */
    pth = malloc(sizeof (Ecore_Pthread_Data));
-   if (!pth)
-     goto on_error;
+   if (!pth) goto on_error;
 
    pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
+   if (!pth->p) goto on_error;
 
    if (pthread_create(&pth->thread, NULL, (void*) _ecore_thread_worker, pth) == 0)
      return (Ecore_Thread*) work;
 
  on_error:
+   if (pth)
+     {
+       if (pth->p) ecore_pipe_del(pth->p);
+       free(pth);
+     }
+
    if (_ecore_thread_count == 0)
      {
        if (work->func_cancel)
          work->func_cancel((void*) work->data);
        free(work);
+       work = NULL;
      }
-   return NULL;
+   return (Ecore_Thread*) work;
 #else
    /*
      If no thread and as we don't want to break app that rely on this
@@ -321,3 +454,136 @@ ecore_thread_cancel(Ecore_Thread *thread)
    return EINA_TRUE;
 #endif
 }
+
+EAPI Eina_Bool
+ecore_thread_check(Ecore_Thread *thread)
+{
+   Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread;
+
+   if (!worker) return EINA_FALSE;
+   return worker->cancel;
+}
+
+EAPI Ecore_Thread *
+ecore_long_run(void (*func_heavy)(Ecore_Thread *thread, void *data),
+              void (*func_notify)(Ecore_Thread *thread, void *data),
+              void (*func_end)(void *data),
+              void (*func_cancel)(void *data),
+              const void *data,
+              Eina_Bool try_no_queue)
+{
+
+#ifdef EFL_HAVE_PTHREAD
+   Ecore_Pthread_Worker *worker;
+   Ecore_Pthread_Data *pth = NULL;
+
+   if (!func_heavy) return NULL;
+
+   worker = malloc(sizeof (Ecore_Pthread_Worker));
+   if (!worker) goto on_error;
+
+   worker->u.long_run.func_heavy = func_heavy;
+   worker->u.long_run.func_notify = func_notify;
+   worker->func_cancel = func_cancel;
+   worker->func_end = func_end;
+   worker->data = data;
+   worker->cancel = EINA_FALSE;
+   worker->long_run = EINA_TRUE;
+
+   worker->u.long_run.notify = ecore_pipe_add(_ecore_notify_handler, NULL);
+
+   if (!try_no_queue)
+     {
+       pthread_t t;
+
+       if (pthread_create(&t, NULL, (void*) _ecore_direct_worker, worker) == 0)
+         return (Ecore_Thread*) worker;
+     }
+
+   pthread_mutex_lock(&_mutex);
+   _ecore_long_thread_data = eina_list_append(_ecore_long_thread_data, worker);
+
+   if (_ecore_thread_count == _ecore_thread_count_max)
+     {
+       pthread_mutex_unlock(&_mutex);
+       return (Ecore_Thread*) worker;
+     }
+
+   pthread_mutex_unlock(&_mutex);
+
+   /* One more thread could be created. */
+   pth = malloc(sizeof (Ecore_Pthread_Data));
+   if (!pth) goto on_error;
+
+   pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
+   if (pth->p) goto on_error;
+
+   if (pthread_create(&pth->thread, NULL, (void*) _ecore_thread_worker, pth) == 0)
+     return (Ecore_Thread*) worker;
+
+ on_error:
+   if (pth)
+     {
+       if (pth->p) ecore_pipe_del(pth->p);
+       free(pth);
+     }
+
+   if (_ecore_thread_count == 0)
+     {
+       if (func_cancel) func_cancel((void*) data);
+
+       if (worker)
+         {
+            ecore_pipe_del(worker->u.long_run.notify);
+            free(worker);
+            worker = NULL;
+         }
+     }
+
+   return (Ecore_Thread*) worker;
+#else
+   Ecore_Pthread_Worker worker;
+
+   /*
+     If no thread and as we don't want to break app that rely on this
+     facility, we will lock the interface until we are done.
+    */
+   worker.u.long_run.func_heavy = func_heavy;
+   worker.u.long_run.func_notify = func_notify;
+   worker.u.long_run.notify = NULL;
+   worker.func_cancel = func_cancel;
+   worker.func_end = func_end;
+   worker.data = data;
+   worker.cancel = EINA_FALSE;
+   worker.long_run = EINA_TRUE;
+
+   func_heavy((Ecore_Thread*) &worker, data);
+
+   if (worker.cancel) func_cancel(data);
+   else func_end(data);
+
+   return NULL;
+#endif
+}
+
+EAPI Eina_Bool
+ecore_thread_notify(Ecore_Thread *thread, void *data)
+{
+   Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread;
+
+   if (!worker) return EINA_FALSE;
+   if (!worker->long_run) return EINA_FALSE;
+
+#ifdef EFL_HAVE_PTHREAD
+   if (worker->u.long_run.self != pthread_self()) return EINA_FALSE;
+
+   ecore_pipe_write(worker->u.long_run.notify, data, sizeof (void*));
+
+   return EINA_TRUE;
+#else
+   worker->u.long_run.func_notify(thread, data);
+
+   return EINA_TRUE;
+#endif
+}
+