Migration with svn version @65618
[framework/uifw/ecore.git] / src / lib / ecore / ecore_thread.c
1 #ifdef HAVE_CONFIG_H
2 # include <config.h>
3 #endif
4
5 #include <sys/time.h>
6 #include <assert.h>
7 #include <sys/types.h>
8 #include <unistd.h>
9
10 #ifdef HAVE_EVIL
11 # include <Evil.h>
12 #endif
13
14 #include "Ecore.h"
15 #include "ecore_private.h"
16
17 #ifdef EFL_HAVE_THREADS
18
19 # ifdef EFL_HAVE_POSIX_THREADS
20 #  include <pthread.h>
21 #  ifdef __linux__
22 #   include <sched.h>
23 #   include <sys/resource.h>
24 #   include <unistd.h>
25 #   include <sys/syscall.h>
26 #   include <errno.h>
27 #  endif
28
29 #  define PH(x)        pthread_t x
30 #  define PHE(x, y)    pthread_equal(x, y)
31 #  define PHS()        pthread_self()
32 #  define PHC(x, f, d) pthread_create(&(x), NULL, (void *)f, d)
33 #  define PHJ(x, p)    pthread_join(x, (void **)(&(p)))
34 #  define PHA(x)       pthread_cancel(x)
35
36 #  define CD(x)        pthread_cond_t x
37 #  define CDI(x)       pthread_cond_init(&(x), NULL);
38 #  define CDD(x)       pthread_cond_destroy(&(x));
39 #  define CDB(x)       pthread_cond_broadcast(&(x));
40 #  define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
41
42 #  define LK(x)        pthread_mutex_t x
43 #  define LKI(x)       pthread_mutex_init(&(x), NULL);
44 #  define LKD(x)       pthread_mutex_destroy(&(x));
45 #  define LKL(x)       pthread_mutex_lock(&(x));
46 #  define LKU(x)       pthread_mutex_unlock(&(x));
47
48 #  define LRWK(x)      pthread_rwlock_t x
49 #  define LRWKI(x)     pthread_rwlock_init(&(x), NULL);
50 #  define LRWKD(x)     pthread_rwlock_destroy(&(x));
51 #  define LRWKWL(x)    pthread_rwlock_wrlock(&(x));
52 #  define LRWKRL(x)    pthread_rwlock_rdlock(&(x));
53 #  define LRWKU(x)     pthread_rwlock_unlock(&(x));
54
55 # else /* EFL_HAVE_WIN32_THREADS */
56
57 #  define WIN32_LEAN_AND_MEAN
58 #  include <windows.h>
59 #  undef WIN32_LEAN_AND_MEAN
60
61 typedef struct
62 {
63    HANDLE thread;
64    void  *val;
65 } win32_thread;
66
67 #  define PH(x)     win32_thread * x
68 #  define PHE(x, y) ((x) == (y))
69 #  define PHS()     (HANDLE)GetCurrentThreadId()
70
71 int
72 _ecore_thread_win32_create(win32_thread         **x,
73                            LPTHREAD_START_ROUTINE f,
74                            void                  *d)
75 {
76    win32_thread *t;
77    t = (win32_thread *)calloc(1, sizeof(win32_thread));
78    if (!t)
79      return -1;
80
81    (t)->thread = CreateThread(NULL, 0, f, d, 0, NULL);
82    if (!t->thread)
83      {
84         free(t);
85         return -1;
86      }
87    t->val = d;
88    *x = t;
89
90    return 0;
91 }
92
93 #  define PHC(x, f, d) _ecore_thread_win32_create(&(x), (LPTHREAD_START_ROUTINE)f, d)
94
95 int
96 _ecore_thread_win32_join(win32_thread *x,
97                          void        **res)
98 {
99    if (!PHE(x, PHS()))
100      {
101         WaitForSingleObject(x->thread, INFINITE);
102         CloseHandle(x->thread);
103      }
104    if (res) *res = x->val;
105    free(x);
106
107    return 0;
108 }
109
110 #  define PHJ(x, p) _ecore_thread_win32_join(x, (void **)(&(p)))
111 #  define PHA(x)    TerminateThread(x->thread, 0)
112
113 #  define LK(x)     HANDLE x
114 #  define LKI(x)    x = CreateMutex(NULL, FALSE, NULL)
115 #  define LKD(x)    CloseHandle(x)
116 #  define LKL(x)    WaitForSingleObject(x, INFINITE)
117 #  define LKU(x)    ReleaseMutex(x)
118
119 typedef struct
120 {
121    HANDLE           semaphore;
122    LONG             threads_count;
123    CRITICAL_SECTION threads_count_lock;
124 } win32_cond;
125
126 #  define CD(x) win32_cond * x
127
128 #  define CDI(x)                                                       \
129   do {                                                                 \
130        x = (win32_cond *)calloc(1, sizeof(win32_cond));                \
131        if (x)                                                          \
132          {                                                             \
133             x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
134             if (x->semaphore)                                          \
135               InitializeCriticalSection(&x->threads_count_lock);       \
136             else                                                       \
137               {                                                        \
138                  free(x);                                              \
139                  x = NULL;                                             \
140               }                                                        \
141          }                                                             \
142     } while (0)
143
144 #  define CDD(x)                  \
145   do {                            \
146        CloseHandle(x->semaphore); \
147        free(x);                   \
148        x = NULL;                  \
149     } while (0)
150
151 #  define CDB(x)                                                  \
152   do {                                                            \
153        EnterCriticalSection(&x->threads_count_lock);              \
154        if (x->threads_count > 0)                                  \
155          ReleaseSemaphore(x->semaphore, x->threads_count, NULL);  \
156        LeaveCriticalSection (&x->threads_count_lock);             \
157     } while (0)
158
159 int
160 _ecore_thread_win32_cond_timedwait(win32_cond     *c,
161                                    HANDLE         *external_mutex,
162                                    struct timeval *t)
163 {
164    DWORD res;
165    DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
166    LKL(external_mutex);
167    EnterCriticalSection (&c->threads_count_lock);
168    c->threads_count++;
169    LeaveCriticalSection (&c->threads_count_lock);
170    LKU(external_mutex);
171    res = WaitForSingleObject(c->semaphore, val);
172    if (res == WAIT_OBJECT_0)
173      return 0;
174    else
175      return -1;
176 }
177
178 #  define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
179
180 typedef struct
181 {
182    LONG readers_count;
183    LONG writers_count;
184    int  readers;
185    int  writers;
186    LK(mutex);
187    CD(cond_read);
188    CD(cond_write);
189 } win32_rwl;
190
191 #  define LRWK(x) win32_rwl * x
192 #  define LRWKI(x)                                    \
193   do {                                                \
194        x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
195        if (x)                                         \
196          {                                            \
197             LKI(x->mutex);                            \
198             if (x->mutex)                             \
199               {                                       \
200                  CDI(x->cond_read);                   \
201                  if (x->cond_read)                    \
202                    {                                  \
203                       CDI(x->cond_write);             \
204                       if (!x->cond_write)             \
205                         {                             \
206                            CDD(x->cond_read);         \
207                            LKD(x->mutex);             \
208                            free(x);                   \
209                            x = NULL;                  \
210                         }                             \
211                    }                                  \
212                  else                                 \
213                    {                                  \
214                       LKD(x->mutex);                  \
215                       free(x);                        \
216                       x = NULL;                       \
217                    }                                  \
218               }                                       \
219             else                                      \
220               {                                       \
221                  free(x);                             \
222                  x = NULL;                            \
223               }                                       \
224          }                                            \
225     } while (0)
226
227 #  define LRWKD(x)         \
228   do {                     \
229        LKU(x->mutex);      \
230        LKD(x->mutex);      \
231        CDD(x->cond_write); \
232        CDD(x->cond_read);  \
233        free(x);            \
234     } while (0)
235 #  define LRWKWL(x)                                                             \
236   do {                                                                          \
237        DWORD res;                                                               \
238        LKU(x->mutex);                                                           \
239        if (x->writers || x->readers > 0)                                        \
240          {                                                                      \
241             x->writers_count++;                                                 \
242             while (x->writers || x->readers > 0)                                \
243               {                                                                 \
244                  EnterCriticalSection(&x->cond_write->threads_count_lock);      \
245                  x->cond_read->threads_count++;                                 \
246                  LeaveCriticalSection(&x->cond_write->threads_count_lock);      \
247                  res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
248                  if (res != WAIT_OBJECT_0) break;                               \
249               }                                                                 \
250             x->writers_count--;                                                 \
251          }                                                                      \
252        if (res == 0) x->writers_count = 1;                                      \
253        LKU(x->mutex);                                                           \
254     } while (0)
255 #  define LRWKRL(x)                                                             \
256   do {                                                                          \
257        DWORD res;                                                               \
258        LKL(x->mutex);                                                           \
259        if (x->writers)                                                          \
260          {                                                                      \
261             x->readers_count++;                                                 \
262             while (x->writers)                                                  \
263               {                                                                 \
264                  EnterCriticalSection(&x->cond_write->threads_count_lock);      \
265                  x->cond_read->threads_count++;                                 \
266                  LeaveCriticalSection(&x->cond_write->threads_count_lock);      \
267                  res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
268                  if (res != WAIT_OBJECT_0) break;                               \
269               }                                                                 \
270             x->readers_count--;                                                 \
271          }                                                                      \
272        if (res == 0)                                                            \
273          x->readers++;                                                          \
274        LKU(x->mutex);                                                           \
275     } while (0)
276 #  define LRWKU(x)                                                          \
277   do {                                                                      \
278        LKL(x->mutex);                                                       \
279        if (x->writers)                                                      \
280          {                                                                  \
281             x->writers = 0;                                                 \
282             if (x->readers_count == 1)                                      \
283               {                                                             \
284                  EnterCriticalSection(&x->cond_read->threads_count_lock);   \
285                  if (x->cond_read->threads_count > 0)                       \
286                    ReleaseSemaphore(x->cond_read->semaphore, 1, 0);         \
287                  LeaveCriticalSection(&x->cond_read->threads_count_lock);   \
288               }                                                             \
289             else if (x->readers_count > 0)                                  \
290               CDB(x->cond_read);                                            \
291             else if (x->writers_count > 0)                                  \
292               {                                                             \
293                  EnterCriticalSection (&x->cond_write->threads_count_lock); \
294                  if (x->cond_write->threads_count > 0)                      \
295                    ReleaseSemaphore(x->cond_write->semaphore, 1, 0);        \
296                  LeaveCriticalSection (&x->cond_write->threads_count_lock); \
297               }                                                             \
298          }                                                                  \
299        else if (x->readers > 0)                                             \
300          {                                                                  \
301             x->readers--;                                                   \
302             if (x->readers == 0 && x->writers_count > 0)                    \
303               {                                                             \
304                  EnterCriticalSection (&x->cond_write->threads_count_lock); \
305                  if (x->cond_write->threads_count > 0)                      \
306                    ReleaseSemaphore(x->cond_write->semaphore, 1, 0);        \
307                  LeaveCriticalSection (&x->cond_write->threads_count_lock); \
308               }                                                             \
309          }                                                                  \
310        LKU(x->mutex);                                                       \
311     } while (0)
312
313 # endif
314
315 #endif
316
317 typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
318 typedef struct _Ecore_Pthread        Ecore_Pthread;
319 typedef struct _Ecore_Thread_Data    Ecore_Thread_Data;
320
321 struct _Ecore_Thread_Data
322 {
323    void        *data;
324    Eina_Free_Cb cb;
325 };
326
327 struct _Ecore_Pthread_Worker
328 {
329    union {
330       struct
331       {
332          Ecore_Thread_Cb func_blocking;
333       } short_run;
334       struct
335       {
336          Ecore_Thread_Cb        func_heavy;
337          Ecore_Thread_Notify_Cb func_notify;
338          Ecore_Pipe            *notify;
339
340          Ecore_Pipe            *direct_pipe;
341          Ecore_Pthread_Worker  *direct_worker;
342
343          int                    send;
344          int                    received;
345       } feedback_run;
346    } u;
347
348    Ecore_Thread_Cb func_cancel;
349    Ecore_Thread_Cb func_end;
350 #ifdef EFL_HAVE_THREADS
351                    PH(self);
352    Eina_Hash      *hash;
353                    CD(cond);
354                    LK(mutex);
355 #endif
356
357    const void     *data;
358
359    Eina_Bool       cancel : 1;
360    Eina_Bool       feedback_run : 1;
361    Eina_Bool       kill : 1;
362    Eina_Bool       reschedule : 1;
363    Eina_Bool       no_queue : 1;
364 };
365
366 #ifdef EFL_HAVE_THREADS
367 typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
368
369 struct _Ecore_Pthread_Data
370 {
371    Ecore_Pthread_Worker *death_job;
372    Ecore_Pipe           *p;
373    void                 *data;
374                          PH(thread);
375 };
376 #endif
377
378 static int _ecore_thread_count_max = 0;
379 static int ECORE_THREAD_PIPE_DEL = 0;
380 static Eina_Array *_ecore_thread_pipe = NULL;
381
382 #ifdef EFL_HAVE_THREADS
383
384 static void _ecore_thread_handler(void        *data __UNUSED__,
385                                   void        *buffer,
386                                   unsigned int nbyte);
387
388 static Ecore_Pipe *
389 _ecore_thread_pipe_get(void)
390 {
391    if (eina_array_count_get(_ecore_thread_pipe) > 0)
392      return eina_array_pop(_ecore_thread_pipe);
393
394    return ecore_pipe_add(_ecore_thread_handler, NULL);
395 }
396
397 static int _ecore_thread_count = 0;
398
399 static Ecore_Event_Handler *del_handler = NULL;
400 static Eina_List *_ecore_active_job_threads = NULL;
401 static Eina_List *_ecore_pending_job_threads = NULL;
402 static Eina_List *_ecore_pending_job_threads_feedback = NULL;
403 static LK(_ecore_pending_job_threads_mutex);
404
405 static Eina_Hash *_ecore_thread_global_hash = NULL;
406 static LRWK(_ecore_thread_global_hash_lock);
407 static LK(_ecore_thread_global_hash_mutex);
408 static CD(_ecore_thread_global_hash_cond);
409
410 static Eina_Bool have_main_loop_thread = 0;
411
412 static Eina_Trash *_ecore_thread_worker_trash = NULL;
413 static int _ecore_thread_worker_count = 0;
414
415 static void                 *_ecore_thread_worker(Ecore_Pthread_Data *pth);
416 static Ecore_Pthread_Worker *_ecore_thread_worker_new(void);
417
418 static PH(get_main_loop_thread) (void)
419 {
420    static PH(main_loop_thread);
421    static pid_t main_loop_pid;
422    pid_t pid = getpid();
423
424    if (pid != main_loop_pid)
425      {
426         main_loop_pid = pid;
427         main_loop_thread = PHS();
428         have_main_loop_thread = 1;
429      }
430
431    return main_loop_thread;
432 }
433
434 static void
435 _ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
436 {
437    if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16)
438      {
439         free(worker);
440         return;
441      }
442
443    eina_trash_push(&_ecore_thread_worker_trash, worker);
444 }
445
446 static void
447 _ecore_thread_data_free(void *data)
448 {
449    Ecore_Thread_Data *d = data;
450
451    if (d->cb) d->cb(d->data);
452    free(d);
453 }
454
455 static void
456 _ecore_thread_pipe_free(void *data __UNUSED__,
457                         void *event)
458 {
459    Ecore_Pipe *p = event;
460
461    if (eina_array_count_get(_ecore_thread_pipe) < 50)
462      eina_array_push(_ecore_thread_pipe, p);
463    else
464      ecore_pipe_del(p);
465    eina_threads_shutdown();
466 }
467
468 static Eina_Bool
469 _ecore_thread_pipe_del(void *data __UNUSED__,
470                        int   type __UNUSED__,
471                        void *event __UNUSED__)
472 {
473    /* This is a hack to delay pipe destruction until we are out of its internal loop. */
474     return ECORE_CALLBACK_CANCEL;
475 }
476
477 static void
478 _ecore_thread_end(Ecore_Pthread_Data *pth,
479                   Ecore_Thread       *work)
480 {
481    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)work;
482    Ecore_Pipe *p;
483
484    if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
485      _ecore_thread_count--;
486
487    if (PHJ(pth->thread, p) != 0)
488      return;
489
490    if (eina_list_count(_ecore_pending_job_threads) > 0
491        && (unsigned int)_ecore_thread_count < eina_list_count(_ecore_pending_job_threads)
492        && _ecore_thread_count < _ecore_thread_count_max)
493      {
494         /* One more thread should be created. */
495          INF("spawning threads because of still pending jobs.");
496
497          pth->death_job = _ecore_thread_worker_new();
498          if (!pth->p || !pth->death_job) goto end;
499
500          eina_threads_init();
501
502          if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
503            {
504               _ecore_thread_count++;
505               return;
506            }
507
508          eina_threads_shutdown();
509
510 end:
511          if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
512      }
513
514    _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
515
516    ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
517    free(pth);
518 }
519
520 static void
521 _ecore_thread_kill(Ecore_Pthread_Worker *work)
522 {
523    if (work->cancel)
524      {
525         if (work->func_cancel)
526           work->func_cancel((void *)work->data, (Ecore_Thread *)work);
527      }
528    else
529      {
530         if (work->func_end)
531           work->func_end((void *)work->data, (Ecore_Thread *)work);
532      }
533
534    if (work->feedback_run)
535      {
536         ecore_pipe_del(work->u.feedback_run.notify);
537
538         if (work->u.feedback_run.direct_pipe)
539           eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
540         if (work->u.feedback_run.direct_worker)
541           _ecore_thread_worker_free(work->u.feedback_run.direct_worker);
542      }
543    CDD(work->cond);
544    LKD(work->mutex);
545    if (work->hash)
546      eina_hash_free(work->hash);
547    free(work);
548 }
549
550 static void
551 _ecore_thread_handler(void        *data __UNUSED__,
552                       void        *buffer,
553                       unsigned int nbyte)
554 {
555    Ecore_Pthread_Worker *work;
556
557    if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
558
559    work = *(Ecore_Pthread_Worker **)buffer;
560
561    if (work->feedback_run)
562      {
563         if (work->u.feedback_run.send != work->u.feedback_run.received)
564           {
565              work->kill = EINA_TRUE;
566              return;
567           }
568      }
569
570    _ecore_thread_kill(work);
571 }
572
573 static void
574 _ecore_notify_handler(void        *data,
575                       void        *buffer,
576                       unsigned int nbyte)
577 {
578    Ecore_Pthread_Worker *work = data;
579    void *user_data;
580
581    if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
582
583    user_data = *(void **)buffer;
584    work->u.feedback_run.received++;
585
586    if (work->u.feedback_run.func_notify)
587      work->u.feedback_run.func_notify((void *)work->data, (Ecore_Thread *)work, user_data);
588
589    /* Force reading all notify event before killing the thread */
590    if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
591      {
592         _ecore_thread_kill(work);
593      }
594 }
595
596 static void
597 _ecore_short_job(Ecore_Pipe *end_pipe)
598 {
599    Ecore_Pthread_Worker *work;
600
601    while (_ecore_pending_job_threads)
602      {
603         LKL(_ecore_pending_job_threads_mutex);
604
605         if (!_ecore_pending_job_threads)
606           {
607              LKU(_ecore_pending_job_threads_mutex);
608              break;
609           }
610
611         work = eina_list_data_get(_ecore_pending_job_threads);
612         _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
613                                                            _ecore_pending_job_threads);
614
615         LKU(_ecore_pending_job_threads_mutex);
616
617         if (!work->cancel)
618           work->u.short_run.func_blocking((void *)work->data, (Ecore_Thread *)work);
619
620         if (work->reschedule)
621           {
622              work->reschedule = EINA_FALSE;
623
624              LKL(_ecore_pending_job_threads_mutex);
625              _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
626              LKU(_ecore_pending_job_threads_mutex);
627           }
628         else
629           {
630              ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
631           }
632      }
633 }
634
635 static void
636 _ecore_feedback_job(Ecore_Pipe *end_pipe,
637                     PH(thread))
638 {
639    Ecore_Pthread_Worker *work;
640
641    while (_ecore_pending_job_threads_feedback)
642      {
643         LKL(_ecore_pending_job_threads_mutex);
644
645         if (!_ecore_pending_job_threads_feedback)
646           {
647              LKU(_ecore_pending_job_threads_mutex);
648              break;
649           }
650
651         work = eina_list_data_get(_ecore_pending_job_threads_feedback);
652         _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
653                                                                     _ecore_pending_job_threads_feedback);
654
655         LKU(_ecore_pending_job_threads_mutex);
656
657         work->self = thread;
658         if (!work->cancel)
659           work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work);
660
661         if (work->reschedule)
662           {
663              work->reschedule = EINA_FALSE;
664
665              LKL(_ecore_pending_job_threads_mutex);
666              _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
667              LKU(_ecore_pending_job_threads_mutex);
668           }
669         else
670           {
671              ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
672           }
673      }
674 }
675
676 static void *
677 _ecore_direct_worker(Ecore_Pthread_Worker *work)
678 {
679    Ecore_Pthread_Data *pth;
680
681 #ifdef EFL_POSIX_THREADS
682    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
683    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
684 #endif
685
686    eina_sched_prio_drop();
687
688    pth = malloc(sizeof (Ecore_Pthread_Data));
689    if (!pth) return NULL;
690
691    pth->p = work->u.feedback_run.direct_pipe;
692    if (!pth->p)
693      {
694         free(pth);
695         return NULL;
696      }
697    pth->thread = PHS();
698
699    work->self = pth->thread;
700    work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work);
701
702    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
703
704    work = work->u.feedback_run.direct_worker;
705    if (!work)
706      {
707         free(pth);
708         return NULL;
709      }
710
711    work->data = pth;
712    work->u.short_run.func_blocking = NULL;
713    work->func_end = (void *)_ecore_thread_end;
714    work->func_cancel = NULL;
715    work->cancel = EINA_FALSE;
716    work->feedback_run = EINA_FALSE;
717    work->kill = EINA_FALSE;
718    work->hash = NULL;
719    CDI(work->cond);
720    LKI(work->mutex);
721
722    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
723
724    return pth->p;
725 }
726
727 static void *
728 _ecore_thread_worker(Ecore_Pthread_Data *pth)
729 {
730    Ecore_Pthread_Worker *work;
731
732 #ifdef EFL_POSIX_THREADS
733    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
734    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
735 #endif
736
737    eina_sched_prio_drop();
738
739 restart:
740    if (_ecore_pending_job_threads) _ecore_short_job(pth->p);
741    if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
742
743    /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
744
745    LKL(_ecore_pending_job_threads_mutex);
746    if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
747      {
748         LKU(_ecore_pending_job_threads_mutex);
749         goto restart;
750      }
751    LKU(_ecore_pending_job_threads_mutex);
752
753    /* Sleep a little to prevent premature death */
754 #ifdef _WIN32
755    Sleep(1); /* around 50ms */
756 #else
757    usleep(200);
758 #endif
759
760    LKL(_ecore_pending_job_threads_mutex);
761    if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
762      {
763         LKU(_ecore_pending_job_threads_mutex);
764         goto restart;
765      }
766    LKU(_ecore_pending_job_threads_mutex);
767
768    work = pth->death_job;
769    if (!work) return NULL;
770
771    work->data = pth;
772    work->u.short_run.func_blocking = NULL;
773    work->func_end = (void *)_ecore_thread_end;
774    work->func_cancel = NULL;
775    work->cancel = EINA_FALSE;
776    work->feedback_run = EINA_FALSE;
777    work->kill = EINA_FALSE;
778    work->hash = NULL;
779    CDI(work->cond);
780    LKI(work->mutex);
781
782    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
783
784    return pth->p;
785 }
786
787 #endif
788
789 static Ecore_Pthread_Worker *
790 _ecore_thread_worker_new(void)
791 {
792 #ifdef EFL_HAVE_THREADS
793    Ecore_Pthread_Worker *result;
794
795    result = eina_trash_pop(&_ecore_thread_worker_trash);
796
797    if (!result) result = malloc(sizeof (Ecore_Pthread_Worker));
798    else _ecore_thread_worker_count--;
799
800    return result;
801 #else
802    return malloc(sizeof (Ecore_Pthread_Worker));
803 #endif
804 }
805
806 void
807 _ecore_thread_init(void)
808 {
809    _ecore_thread_count_max = eina_cpu_count();
810    if (_ecore_thread_count_max <= 0)
811      _ecore_thread_count_max = 1;
812
813    ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
814    _ecore_thread_pipe = eina_array_new(8);
815
816 #ifdef EFL_HAVE_THREADS
817    del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
818
819    LKI(_ecore_pending_job_threads_mutex);
820    LRWKI(_ecore_thread_global_hash_lock);
821    LKI(_ecore_thread_global_hash_mutex);
822    CDI(_ecore_thread_global_hash_cond);
823 #endif
824 }
825
826 void
827 _ecore_thread_shutdown(void)
828 {
829    /* FIXME: If function are still running in the background, should we kill them ? */
830     Ecore_Pipe *p;
831     Eina_Array_Iterator it;
832     unsigned int i;
833
834 #ifdef EFL_HAVE_THREADS
835     Ecore_Pthread_Worker *work;
836     Ecore_Pthread_Data *pth;
837
838     LKL(_ecore_pending_job_threads_mutex);
839
840     EINA_LIST_FREE(_ecore_pending_job_threads, work)
841       {
842          if (work->func_cancel)
843            work->func_cancel((void *)work->data, (Ecore_Thread *)work);
844          free(work);
845       }
846
847     EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
848       {
849          if (work->func_cancel)
850            work->func_cancel((void *)work->data, (Ecore_Thread *)work);
851          free(work);
852       }
853
854     LKU(_ecore_pending_job_threads_mutex);
855
856     /* Improve emergency shutdown */
857     EINA_LIST_FREE(_ecore_active_job_threads, pth)
858       {
859          Ecore_Pipe *ep;
860
861          PHA(pth->thread);
862          PHJ(pth->thread, ep);
863
864          ecore_pipe_del(pth->p);
865       }
866     if (_ecore_thread_global_hash)
867       eina_hash_free(_ecore_thread_global_hash);
868     _ecore_event_handler_del(del_handler);
869     have_main_loop_thread = 0;
870     del_handler = NULL;
871
872     LKD(_ecore_pending_job_threads_mutex);
873     LRWKD(_ecore_thread_global_hash_lock);
874     LKD(_ecore_thread_global_hash_mutex);
875     CDD(_ecore_thread_global_hash_cond);
876 #endif
877
878     EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
879     ecore_pipe_del(p);
880
881     eina_array_free(_ecore_thread_pipe);
882     _ecore_thread_pipe = NULL;
883 }
884
885 void
886 _ecore_thread_assert_main_loop_thread(const char *function)
887 {
888    Eina_Bool good;
889 #ifdef EFL_HAVE_THREADS
890    good = PHE(get_main_loop_thread(), PHS());
891 #else
892    good = EINA_TRUE;
893 #endif
894    if (!good)
895      {
896         EINA_LOG_CRIT("Call to %s from wrong thread!", function);
897 #if 0
898         abort();
899 #endif
900      }
901 }
902
903 EAPI Ecore_Thread *
904 ecore_thread_run(Ecore_Thread_Cb func_blocking,
905                  Ecore_Thread_Cb func_end,
906                  Ecore_Thread_Cb func_cancel,
907                  const void     *data)
908 {
909    Ecore_Pthread_Worker *work;
910 #ifdef EFL_HAVE_THREADS
911    Ecore_Pthread_Data *pth = NULL;
912 #endif
913
914    if (!func_blocking) return NULL;
915
916    work = _ecore_thread_worker_new();
917    if (!work)
918      {
919         if (func_cancel)
920           func_cancel((void *)data, NULL);
921         return NULL;
922      }
923
924    work->u.short_run.func_blocking = func_blocking;
925    work->func_end = func_end;
926    work->func_cancel = func_cancel;
927    work->cancel = EINA_FALSE;
928    work->feedback_run = EINA_FALSE;
929    work->kill = EINA_FALSE;
930    work->reschedule = EINA_FALSE;
931    work->data = data;
932
933 #ifdef EFL_HAVE_THREADS
934    work->hash = NULL;
935    CDI(work->cond);
936    LKI(work->mutex);
937
938    LKL(_ecore_pending_job_threads_mutex);
939    _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
940
941    if (_ecore_thread_count == _ecore_thread_count_max)
942      {
943         LKU(_ecore_pending_job_threads_mutex);
944         return (Ecore_Thread *)work;
945      }
946
947    LKU(_ecore_pending_job_threads_mutex);
948
949    /* One more thread could be created. */
950    pth = malloc(sizeof (Ecore_Pthread_Data));
951    if (!pth) goto on_error;
952
953    pth->p = _ecore_thread_pipe_get();
954    pth->death_job = _ecore_thread_worker_new();
955    if (!pth->p || !pth->death_job) goto on_error;
956
957    eina_threads_init();
958
959    if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
960      {
961         _ecore_thread_count++;
962         return (Ecore_Thread *)work;
963      }
964
965    eina_threads_shutdown();
966
967 on_error:
968    if (pth)
969      {
970         if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
971         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
972         free(pth);
973      }
974
975    if (_ecore_thread_count == 0)
976      {
977         LKL(_ecore_pending_job_threads_mutex);
978         _ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
979         LKU(_ecore_pending_job_threads_mutex);
980
981         if (work->func_cancel)
982           work->func_cancel((void *)work->data, (Ecore_Thread *)work);
983         free(work);
984         work = NULL;
985      }
986    return (Ecore_Thread *)work;
987 #else
988    /*
989       If no thread and as we don't want to break app that rely on this
990       facility, we will lock the interface until we are done.
991     */
992    do {
993         /* Handle reschedule by forcing it here. That would mean locking the app,
994          * would be better with an idler, but really to complex for a case where
995          * thread should really exist.
996          */
997           work->reschedule = EINA_FALSE;
998
999           func_blocking((void *)data, (Ecore_Thread *)work);
1000           if (work->cancel == EINA_FALSE) func_end((void *)data, (Ecore_Thread *)work);
1001           else func_cancel((void *)data, (Ecore_Thread *)work);
1002      } while (work->reschedule == EINA_TRUE);
1003
1004    free(work);
1005
1006    return NULL;
1007 #endif
1008 }
1009
1010 EAPI Eina_Bool
1011 ecore_thread_cancel(Ecore_Thread *thread)
1012 {
1013 #ifdef EFL_HAVE_THREADS
1014    Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
1015    Eina_List *l;
1016
1017    if (!work)
1018      return EINA_TRUE;
1019    if (work->cancel)
1020      return EINA_FALSE;
1021
1022    if (work->feedback_run)
1023      {
1024         if (work->kill)
1025           return EINA_TRUE;
1026         if (work->u.feedback_run.send != work->u.feedback_run.received)
1027           goto on_exit;
1028      }
1029
1030    LKL(_ecore_pending_job_threads_mutex);
1031
1032    if ((have_main_loop_thread) &&
1033        (PHE(get_main_loop_thread(), PHS())))
1034      {
1035         if (!work->feedback_run)
1036           EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
1037             {
1038                if ((void *)work == (void *)thread)
1039                  {
1040                     _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
1041
1042                     LKU(_ecore_pending_job_threads_mutex);
1043
1044                     if (work->func_cancel)
1045                       work->func_cancel((void *)work->data, (Ecore_Thread *)work);
1046                     free(work);
1047
1048                     return EINA_TRUE;
1049                  }
1050             }
1051         else
1052           EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
1053             {
1054                if ((void *)work == (void *)thread)
1055                  {
1056                     _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
1057
1058                     LKU(_ecore_pending_job_threads_mutex);
1059
1060                     if (work->func_cancel)
1061                       work->func_cancel((void *)work->data, (Ecore_Thread *)work);
1062                     free(work);
1063
1064                     return EINA_TRUE;
1065                  }
1066             }
1067      }
1068
1069    LKU(_ecore_pending_job_threads_mutex);
1070
1071    /* Delay the destruction */
1072 on_exit:
1073    ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
1074    return EINA_FALSE;
1075 #else
1076    (void) thread;
1077    return EINA_TRUE;
1078 #endif
1079 }
1080
1081 EAPI Eina_Bool
1082 ecore_thread_check(Ecore_Thread *thread)
1083 {
1084    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1085
1086    if (!worker) return EINA_TRUE;
1087    return worker->cancel;
1088 }
1089
1090 EAPI Ecore_Thread *
1091 ecore_thread_feedback_run(Ecore_Thread_Cb        func_heavy,
1092                           Ecore_Thread_Notify_Cb func_notify,
1093                           Ecore_Thread_Cb        func_end,
1094                           Ecore_Thread_Cb        func_cancel,
1095                           const void            *data,
1096                           Eina_Bool              try_no_queue)
1097 {
1098 #ifdef EFL_HAVE_THREADS
1099    Ecore_Pthread_Worker *worker;
1100    Ecore_Pthread_Data *pth = NULL;
1101
1102    if (!func_heavy) return NULL;
1103
1104    worker = _ecore_thread_worker_new();
1105    if (!worker) goto on_error;
1106
1107    worker->u.feedback_run.func_heavy = func_heavy;
1108    worker->u.feedback_run.func_notify = func_notify;
1109    worker->hash = NULL;
1110    CDI(worker->cond);
1111    LKI(worker->mutex);
1112    worker->func_cancel = func_cancel;
1113    worker->func_end = func_end;
1114    worker->data = data;
1115    worker->cancel = EINA_FALSE;
1116    worker->feedback_run = EINA_TRUE;
1117    worker->kill = EINA_FALSE;
1118    worker->reschedule = EINA_FALSE;
1119
1120    worker->u.feedback_run.send = 0;
1121    worker->u.feedback_run.received = 0;
1122
1123    worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
1124    worker->u.feedback_run.direct_pipe = NULL;
1125    worker->u.feedback_run.direct_worker = NULL;
1126
1127    if (!try_no_queue)
1128      {
1129         PH(t);
1130
1131         worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
1132         worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
1133         worker->no_queue = EINA_TRUE;
1134
1135         eina_threads_init();
1136
1137         if (PHC(t, _ecore_direct_worker, worker) == 0)
1138           return (Ecore_Thread *)worker;
1139
1140         eina_threads_shutdown();
1141      }
1142
1143    worker->no_queue = EINA_FALSE;
1144
1145    LKL(_ecore_pending_job_threads_mutex);
1146    _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, worker);
1147
1148    if (_ecore_thread_count == _ecore_thread_count_max)
1149      {
1150         LKU(_ecore_pending_job_threads_mutex);
1151         return (Ecore_Thread *)worker;
1152      }
1153
1154    LKU(_ecore_pending_job_threads_mutex);
1155
1156    /* One more thread could be created. */
1157    pth = malloc(sizeof (Ecore_Pthread_Data));
1158    if (!pth) goto on_error;
1159
1160    pth->p = _ecore_thread_pipe_get();
1161    pth->death_job = _ecore_thread_worker_new();
1162    if (!pth->p || !pth->death_job) goto on_error;
1163
1164    eina_threads_init();
1165
1166    if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
1167      {
1168         _ecore_thread_count++;
1169         return (Ecore_Thread *)worker;
1170      }
1171
1172    eina_threads_shutdown();
1173
1174 on_error:
1175    if (pth)
1176      {
1177         if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
1178         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
1179         free(pth);
1180      }
1181
1182    if (_ecore_thread_count == 0)
1183      {
1184         LKL(_ecore_pending_job_threads_mutex);
1185         _ecore_pending_job_threads_feedback = eina_list_remove(_ecore_pending_job_threads_feedback,
1186                                                                worker);
1187         LKU(_ecore_pending_job_threads_mutex);
1188
1189         if (func_cancel) func_cancel((void *)data, NULL);
1190
1191         if (worker)
1192           {
1193              ecore_pipe_del(worker->u.feedback_run.notify);
1194              free(worker);
1195              worker = NULL;
1196           }
1197      }
1198
1199    return (Ecore_Thread *)worker;
1200 #else
1201    Ecore_Pthread_Worker worker;
1202
1203    (void)try_no_queue;
1204
1205    /*
1206       If no thread and as we don't want to break app that rely on this
1207       facility, we will lock the interface until we are done.
1208     */
1209    worker.u.feedback_run.func_heavy = func_heavy;
1210    worker.u.feedback_run.func_notify = func_notify;
1211    worker.u.feedback_run.notify = NULL;
1212    worker.u.feedback_run.send = 0;
1213    worker.u.feedback_run.received = 0;
1214    worker.func_cancel = func_cancel;
1215    worker.func_end = func_end;
1216    worker.data = data;
1217    worker.cancel = EINA_FALSE;
1218    worker.feedback_run = EINA_TRUE;
1219    worker.kill = EINA_FALSE;
1220
1221    do {
1222         worker.reschedule = EINA_FALSE;
1223
1224         func_heavy((void *)data, (Ecore_Thread *)&worker);
1225
1226         if (worker.cancel) func_cancel((void *)data, (Ecore_Thread *)&worker);
1227         else func_end((void *)data, (Ecore_Thread *)&worker);
1228      } while (worker.reschedule == EINA_TRUE);
1229
1230    return NULL;
1231 #endif
1232 }
1233
1234 EAPI Eina_Bool
1235 ecore_thread_feedback(Ecore_Thread *thread,
1236                       const void   *data)
1237 {
1238    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1239
1240    if (!worker) return EINA_FALSE;
1241    if (!worker->feedback_run) return EINA_FALSE;
1242
1243 #ifdef EFL_HAVE_THREADS
1244    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1245
1246    worker->u.feedback_run.send++;
1247    ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
1248
1249    return EINA_TRUE;
1250 #else
1251    worker->u.feedback_run.func_notify((void *)worker->data, thread, (void *)data);
1252
1253    return EINA_TRUE;
1254 #endif
1255 }
1256
1257 EAPI Eina_Bool
1258 ecore_thread_reschedule(Ecore_Thread *thread)
1259 {
1260    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1261
1262    if (!worker) return EINA_FALSE;
1263
1264 #ifdef EFL_HAVE_THREADS
1265    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1266 #endif
1267
1268    worker->reschedule = EINA_TRUE;
1269    return EINA_TRUE;
1270 }
1271
1272 EAPI int
1273 ecore_thread_active_get(void)
1274 {
1275 #ifdef EFL_HAVE_THREADS
1276    return _ecore_thread_count;
1277 #else
1278    return 0;
1279 #endif
1280 }
1281
1282 EAPI int
1283 ecore_thread_pending_get(void)
1284 {
1285 #ifdef EFL_HAVE_THREADS
1286    int ret;
1287
1288    LKL(_ecore_pending_job_threads_mutex);
1289    ret = eina_list_count(_ecore_pending_job_threads);
1290    LKU(_ecore_pending_job_threads_mutex);
1291    return ret;
1292 #else
1293    return 0;
1294 #endif
1295 }
1296
1297 EAPI int
1298 ecore_thread_pending_feedback_get(void)
1299 {
1300 #ifdef EFL_HAVE_THREADS
1301    int ret;
1302
1303    LKL(_ecore_pending_job_threads_mutex);
1304    ret = eina_list_count(_ecore_pending_job_threads_feedback);
1305    LKU(_ecore_pending_job_threads_mutex);
1306    return ret;
1307 #else
1308    return 0;
1309 #endif
1310 }
1311
1312 EAPI int
1313 ecore_thread_pending_total_get(void)
1314 {
1315 #ifdef EFL_HAVE_THREADS
1316    int ret;
1317
1318    LKL(_ecore_pending_job_threads_mutex);
1319    ret = eina_list_count(_ecore_pending_job_threads) + eina_list_count(_ecore_pending_job_threads_feedback);
1320    LKU(_ecore_pending_job_threads_mutex);
1321    return ret;
1322 #else
1323    return 0;
1324 #endif
1325 }
1326
1327 EAPI int
1328 ecore_thread_max_get(void)
1329 {
1330    return _ecore_thread_count_max;
1331 }
1332
1333 EAPI void
1334 ecore_thread_max_set(int num)
1335 {
1336    if (num < 1) return;
1337    /* avoid doing something hilarious by blocking dumb users */
1338    if (num >= (2 * eina_cpu_count())) return;
1339
1340    _ecore_thread_count_max = num;
1341 }
1342
1343 EAPI void
1344 ecore_thread_max_reset(void)
1345 {
1346    _ecore_thread_count_max = eina_cpu_count();
1347 }
1348
1349 EAPI int
1350 ecore_thread_available_get(void)
1351 {
1352 #ifdef EFL_HAVE_THREADS
1353    int ret;
1354
1355    LKL(_ecore_pending_job_threads_mutex);
1356    ret = _ecore_thread_count_max - _ecore_thread_count;
1357    LKU(_ecore_pending_job_threads_mutex);
1358    return ret;
1359 #else
1360    return 0;
1361 #endif
1362 }
1363
1364 EAPI Eina_Bool
1365 ecore_thread_local_data_add(Ecore_Thread *thread,
1366                             const char   *key,
1367                             void         *value,
1368                             Eina_Free_Cb  cb,
1369                             Eina_Bool     direct)
1370 {
1371 #ifdef EFL_HAVE_THREADS
1372    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1373    Ecore_Thread_Data *d;
1374    Eina_Bool ret;
1375 #endif
1376
1377    if ((!thread) || (!key) || (!value))
1378      return EINA_FALSE;
1379 #ifdef EFL_HAVE_THREADS
1380    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1381
1382    if (!worker->hash)
1383      worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1384
1385    if (!worker->hash)
1386      return EINA_FALSE;
1387
1388    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1389      return EINA_FALSE;
1390
1391    d->data = value;
1392    d->cb = cb;
1393
1394    if (direct)
1395      ret = eina_hash_direct_add(worker->hash, key, d);
1396    else
1397      ret = eina_hash_add(worker->hash, key, d);
1398    CDB(worker->cond);
1399    return ret;
1400 #else
1401    (void) cb;
1402    (void) direct;
1403    return EINA_FALSE;
1404 #endif
1405 }
1406
1407 EAPI void *
1408 ecore_thread_local_data_set(Ecore_Thread *thread,
1409                             const char   *key,
1410                             void         *value,
1411                             Eina_Free_Cb  cb)
1412 {
1413 #ifdef EFL_HAVE_THREADS
1414    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1415    Ecore_Thread_Data *d, *r;
1416    void *ret;
1417 #endif
1418
1419    if ((!thread) || (!key) || (!value))
1420      return NULL;
1421 #ifdef EFL_HAVE_THREADS
1422    if (!PHE(worker->self, PHS())) return NULL;
1423
1424    if (!worker->hash)
1425      worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1426
1427    if (!worker->hash)
1428      return NULL;
1429
1430    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1431      return NULL;
1432
1433    d->data = value;
1434    d->cb = cb;
1435
1436    r = eina_hash_set(worker->hash, key, d);
1437    CDB(worker->cond);
1438    ret = r->data;
1439    free(r);
1440    return ret;
1441 #else
1442    (void) cb;
1443    return NULL;
1444 #endif
1445 }
1446
1447 EAPI void *
1448 ecore_thread_local_data_find(Ecore_Thread *thread,
1449                              const char   *key)
1450 {
1451 #ifdef EFL_HAVE_THREADS
1452    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1453    Ecore_Thread_Data *d;
1454 #endif
1455
1456    if ((!thread) || (!key))
1457      return NULL;
1458 #ifdef EFL_HAVE_THREADS
1459    if (!PHE(worker->self, PHS())) return NULL;
1460
1461    if (!worker->hash)
1462      return NULL;
1463
1464    d = eina_hash_find(worker->hash, key);
1465    if (d)
1466      return d->data;
1467    return NULL;
1468 #else
1469    return NULL;
1470 #endif
1471 }
1472
1473 EAPI Eina_Bool
1474 ecore_thread_local_data_del(Ecore_Thread *thread,
1475                             const char   *key)
1476 {
1477 #ifdef EFL_HAVE_THREADS
1478    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1479 #endif
1480
1481    if ((!thread) || (!key))
1482      return EINA_FALSE;
1483 #ifdef EFL_HAVE_THREADS
1484    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1485
1486    if (!worker->hash)
1487      return EINA_FALSE;
1488    return eina_hash_del_by_key(worker->hash, key);
1489 #else
1490    return EINA_TRUE;
1491 #endif
1492 }
1493
1494 EAPI Eina_Bool
1495 ecore_thread_global_data_add(const char  *key,
1496                              void        *value,
1497                              Eina_Free_Cb cb,
1498                              Eina_Bool    direct)
1499 {
1500 #ifdef EFL_HAVE_THREADS
1501    Ecore_Thread_Data *d;
1502    Eina_Bool ret;
1503 #endif
1504
1505    if ((!key) || (!value))
1506      return EINA_FALSE;
1507 #ifdef EFL_HAVE_THREADS
1508    LRWKWL(_ecore_thread_global_hash_lock);
1509    if (!_ecore_thread_global_hash)
1510      _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1511    LRWKU(_ecore_thread_global_hash_lock);
1512
1513    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1514      return EINA_FALSE;
1515
1516    d->data = value;
1517    d->cb = cb;
1518
1519    if (!_ecore_thread_global_hash)
1520      return EINA_FALSE;
1521    LRWKWL(_ecore_thread_global_hash_lock);
1522    if (direct)
1523      ret = eina_hash_direct_add(_ecore_thread_global_hash, key, d);
1524    else
1525      ret = eina_hash_add(_ecore_thread_global_hash, key, d);
1526    LRWKU(_ecore_thread_global_hash_lock);
1527    CDB(_ecore_thread_global_hash_cond);
1528    return ret;
1529 #else
1530    (void) cb;
1531    (void) direct;
1532    return EINA_TRUE;
1533 #endif
1534 }
1535
1536 EAPI void *
1537 ecore_thread_global_data_set(const char  *key,
1538                              void        *value,
1539                              Eina_Free_Cb cb)
1540 {
1541 #ifdef EFL_HAVE_THREADS
1542    Ecore_Thread_Data *d, *r;
1543    void *ret;
1544 #endif
1545
1546    if ((!key) || (!value))
1547      return NULL;
1548 #ifdef EFL_HAVE_THREADS
1549    LRWKWL(_ecore_thread_global_hash_lock);
1550    if (!_ecore_thread_global_hash)
1551      _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1552    LRWKU(_ecore_thread_global_hash_lock);
1553
1554    if (!_ecore_thread_global_hash)
1555      return NULL;
1556
1557    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1558      return NULL;
1559
1560    d->data = value;
1561    d->cb = cb;
1562
1563    LRWKWL(_ecore_thread_global_hash_lock);
1564    r = eina_hash_set(_ecore_thread_global_hash, key, d);
1565    LRWKU(_ecore_thread_global_hash_lock);
1566    CDB(_ecore_thread_global_hash_cond);
1567
1568    ret = r->data;
1569    free(r);
1570    return ret;
1571 #else
1572    (void) cb;
1573    return NULL;
1574 #endif
1575 }
1576
1577 EAPI void *
1578 ecore_thread_global_data_find(const char *key)
1579 {
1580 #ifdef EFL_HAVE_THREADS
1581    Ecore_Thread_Data *ret;
1582 #endif
1583
1584    if (!key)
1585      return NULL;
1586 #ifdef EFL_HAVE_THREADS
1587    if (!_ecore_thread_global_hash) return NULL;
1588
1589    LRWKRL(_ecore_thread_global_hash_lock);
1590    ret = eina_hash_find(_ecore_thread_global_hash, key);
1591    LRWKU(_ecore_thread_global_hash_lock);
1592    if (ret)
1593      return ret->data;
1594    return NULL;
1595 #else
1596    return NULL;
1597 #endif
1598 }
1599
1600 EAPI Eina_Bool
1601 ecore_thread_global_data_del(const char *key)
1602 {
1603 #ifdef EFL_HAVE_THREADS
1604    Eina_Bool ret;
1605 #endif
1606
1607    if (!key)
1608      return EINA_FALSE;
1609 #ifdef EFL_HAVE_THREADS
1610    if (!_ecore_thread_global_hash)
1611      return EINA_FALSE;
1612
1613    LRWKWL(_ecore_thread_global_hash_lock);
1614    ret = eina_hash_del_by_key(_ecore_thread_global_hash, key);
1615    LRWKU(_ecore_thread_global_hash_lock);
1616    return ret;
1617 #else
1618    return EINA_TRUE;
1619 #endif
1620 }
1621
1622 EAPI void *
1623 ecore_thread_global_data_wait(const char *key,
1624                               double      seconds)
1625 {
1626 #ifdef EFL_HAVE_THREADS
1627    double tm = 0;
1628    Ecore_Thread_Data *ret = NULL;
1629 #endif
1630
1631    if (!key)
1632      return NULL;
1633 #ifdef EFL_HAVE_THREADS
1634    if (!_ecore_thread_global_hash)
1635      return NULL;
1636    if (seconds > 0)
1637      tm = ecore_time_get() + seconds;
1638
1639    while (1)
1640      {
1641 #ifndef _WIN32
1642         struct timespec t = { 0, 0 };
1643
1644         t.tv_sec = (long int)tm;
1645         t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
1646 #else
1647         struct timeval t = { 0, 0 };
1648
1649         t.tv_sec = (long int)tm;
1650         t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
1651 #endif
1652         LRWKRL(_ecore_thread_global_hash_lock);
1653         ret = eina_hash_find(_ecore_thread_global_hash, key);
1654         LRWKU(_ecore_thread_global_hash_lock);
1655         if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
1656           break;
1657         LKL(_ecore_thread_global_hash_mutex);
1658         CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
1659         LKU(_ecore_thread_global_hash_mutex);
1660      }
1661    if (ret) return ret->data;
1662    return NULL;
1663 #else
1664    (void) seconds;
1665    return NULL;
1666 #endif
1667 }
1668