[Upstream migration @62307] Merge remote branch 'origin/upstream'
[profile/ivi/ecore.git] / src / lib / ecore / ecore_thread.c
1 #ifdef HAVE_CONFIG_H
2 # include <config.h>
3 #endif
4
5 #include <sys/time.h>
6 #include <assert.h>
7
8 #ifdef HAVE_EVIL
9 # include <Evil.h>
10 #endif
11
12 #include "Ecore.h"
13 #include "ecore_private.h"
14
15 #ifdef EFL_HAVE_THREADS
16
17 # ifdef EFL_HAVE_POSIX_THREADS
18 #  include <pthread.h>
19 #  ifdef __linux__
20 #   include <sched.h>
21 #   include <sys/resource.h>
22 #   include <unistd.h>
23 #   include <sys/syscall.h>
24 #   include <errno.h>
25 #  endif
26
27 #  define PH(x)        pthread_t x
28 #  define PHE(x, y)    pthread_equal(x, y)
29 #  define PHS()        pthread_self()
30 #  define PHC(x, f, d) pthread_create(&(x), NULL, (void*) f, d)
31 #  define PHJ(x, p)    pthread_join(x, (void**)(&(p)))
32 #  define PHA(x)       pthread_cancel(x)
33
34 #  define CD(x)  pthread_cond_t x
35 #  define CDI(x) pthread_cond_init(&(x), NULL);
36 #  define CDD(x) pthread_cond_destroy(&(x));
37 #  define CDB(x) pthread_cond_broadcast(&(x));
38 #  define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
39
40 #  define LK(x)  pthread_mutex_t x
41 #  define LKI(x) pthread_mutex_init(&(x), NULL);
42 #  define LKD(x) pthread_mutex_destroy(&(x));
43 #  define LKL(x) pthread_mutex_lock(&(x));
44 #  define LKU(x) pthread_mutex_unlock(&(x));
45
46 #  define LRWK(x)   pthread_rwlock_t x
47 #  define LRWKI(x)  pthread_rwlock_init(&(x), NULL);
48 #  define LRWKD(x)  pthread_rwlock_destroy(&(x));
49 #  define LRWKWL(x) pthread_rwlock_wrlock(&(x));
50 #  define LRWKRL(x) pthread_rwlock_rdlock(&(x));
51 #  define LRWKU(x)  pthread_rwlock_unlock(&(x));
52
53 # else /* EFL_HAVE_WIN32_THREADS */
54
55 #  define WIN32_LEAN_AND_MEAN
56 #  include <windows.h>
57 #  undef WIN32_LEAN_AND_MEAN
58
59 typedef struct
60 {
61   HANDLE thread;
62   void *val;
63 } win32_thread;
64
65 #  define PH(x)        win32_thread *x
66 #  define PHE(x, y)    ((x) == (y))
67 #  define PHS()        (HANDLE)GetCurrentThreadId()
68
69 int _ecore_thread_win32_create(win32_thread **x, LPTHREAD_START_ROUTINE f, void *d)
70 {
71   win32_thread *t;
72   t = (win32_thread *)calloc(1, sizeof(win32_thread));
73   if (!t)
74     return -1;
75
76   (t)->thread = CreateThread(NULL, 0, f, d, 0, NULL);
77   if (!t->thread)
78     {
79       free(t);
80       return -1;
81     }
82   t->val = d;
83   *x = t;
84
85   return 0;
86 }
87 #  define PHC(x, f, d) _ecore_thread_win32_create(&(x), (LPTHREAD_START_ROUTINE)f, d)
88
89 int _ecore_thread_win32_join(win32_thread *x, void **res)
90 {
91   if (!PHE(x, PHS()))
92     {
93       WaitForSingleObject(x->thread, INFINITE);
94       CloseHandle(x->thread);
95     }
96   if (res) *res = x->val;
97   free(x);
98
99   return 0;
100 }
101
102 #  define PHJ(x, p) _ecore_thread_win32_join(x, (void**)(&(p)))
103 #  define PHA(x) TerminateThread(x->thread, 0)
104
105 #  define LK(x)  HANDLE x
106 #  define LKI(x) x = CreateMutex(NULL, FALSE, NULL)
107 #  define LKD(x) CloseHandle(x)
108 #  define LKL(x) WaitForSingleObject(x, INFINITE)
109 #  define LKU(x) ReleaseMutex(x)
110
111 typedef struct
112 {
113   HANDLE semaphore;
114   LONG threads_count;
115   CRITICAL_SECTION threads_count_lock;
116 } win32_cond;
117
118 #  define CD(x)  win32_cond *x
119
120 #  define CDI(x)                                                     \
121    do {                                                              \
122      x = (win32_cond *)calloc(1, sizeof(win32_cond));                \
123      if (x)                                                          \
124         {                                                            \
125           x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
126           if (x->semaphore)                                          \
127             InitializeCriticalSection(&x->threads_count_lock);     \
128           else                                                       \
129             {                                                        \
130               free(x);                                               \
131               x = NULL;                                              \
132             }                                                        \
133         }                                                            \
134    } while (0)
135
136 #  define CDD(x)               \
137   do {                         \
138     CloseHandle(x->semaphore); \
139     free(x);                   \
140     x = NULL;                  \
141    } while (0)
142
143 #  define CDB(x)                                            \
144 do {                                                        \
145   EnterCriticalSection(&x->threads_count_lock);             \
146   if (x->threads_count > 0)                                 \
147     ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \
148   LeaveCriticalSection (&x->threads_count_lock);            \
149  } while (0)
150
151 int _ecore_thread_win32_cond_timedwait(win32_cond *c, HANDLE *external_mutex, struct timeval *t)
152 {
153   DWORD res;
154   DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
155   LKL(external_mutex);
156   EnterCriticalSection (&c->threads_count_lock);
157   c->threads_count++;
158   LeaveCriticalSection (&c->threads_count_lock);
159   LKU(external_mutex);
160   res = WaitForSingleObject(c->semaphore, val);
161   if (res == WAIT_OBJECT_0)
162     return 0;
163   else
164     return -1;
165 }
166 #  define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
167
168 typedef struct
169 {
170   LONG readers_count;
171   LONG writers_count;
172   int readers;
173   int writers;
174   LK(mutex);
175   CD(cond_read);
176   CD(cond_write);
177 } win32_rwl;
178
179 #  define LRWK(x)   win32_rwl *x
180 #  define LRWKI(x)                                 \
181   do {                                             \
182     x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
183     if (x)                                         \
184       {                                            \
185         LKI(x->mutex);                             \
186         if (x->mutex)                              \
187           {                                        \
188             CDI(x->cond_read);                     \
189             if (x->cond_read)                      \
190               {                                    \
191                 CDI(x->cond_write);                \
192                 if (!x->cond_write)                \
193                   {                                \
194                     CDD(x->cond_read);             \
195                     LKD(x->mutex);                 \
196                     free(x);                       \
197                     x = NULL;                      \
198                   }                                \
199               }                                    \
200             else                                   \
201               {                                    \
202                 LKD(x->mutex);                     \
203                 free(x);                           \
204                 x = NULL;                          \
205               }                                    \
206           }                                        \
207         else                                       \
208           {                                        \
209             free(x);                               \
210             x = NULL;                              \
211           }                                        \
212       }                                            \
213   } while (0)
214
215 #  define LRWKD(x)                   \
216   do {                               \
217     LKU(x->mutex);                   \
218     LKD(x->mutex);                   \
219     CDD(x->cond_write);              \
220     CDD(x->cond_read);               \
221     free(x);                         \
222   } while (0)
223 #  define LRWKWL(x)                                                       \
224   do {                                                                    \
225     DWORD res;                                                            \
226     LKU(x->mutex);                                                        \
227     if (x->writers || x->readers > 0)                                     \
228       {                                                                   \
229         x->writers_count++;                                               \
230         while (x->writers || x->readers > 0)                              \
231           {                                                               \
232             EnterCriticalSection(&x->cond_write->threads_count_lock);     \
233             x->cond_read->threads_count++;                                \
234             LeaveCriticalSection(&x->cond_write->threads_count_lock);     \
235             res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
236             if (res != WAIT_OBJECT_0) break;                              \
237           }                                                               \
238         x->writers_count--;                                               \
239       }                                                                   \
240     if (res == 0) x->writers_count = 1;                                   \
241     LKU(x->mutex);                                                        \
242   } while (0)
243 #  define LRWKRL(x)                                                       \
244   do {                                                                    \
245     DWORD res;                                                            \
246     LKL(x->mutex);                                                        \
247     if (x->writers)                                                       \
248       {                                                                   \
249         x->readers_count++;                                               \
250         while (x->writers)                                                \
251           {                                                               \
252             EnterCriticalSection(&x->cond_write->threads_count_lock);     \
253             x->cond_read->threads_count++;                                \
254             LeaveCriticalSection(&x->cond_write->threads_count_lock);     \
255             res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
256             if (res != WAIT_OBJECT_0) break;                              \
257           }                                                               \
258         x->readers_count--;                                               \
259       }                                                                   \
260     if (res == 0)                                                         \
261       x->readers++;                                                       \
262     LKU(x->mutex);                                                        \
263   } while (0)
264 #  define LRWKU(x)                                                     \
265   do {                                                                 \
266     LKL(x->mutex);                                                     \
267     if (x->writers)                                                    \
268       {                                                                \
269         x->writers = 0;                                                \
270         if (x->readers_count == 1)                                     \
271           {                                                            \
272             EnterCriticalSection(&x->cond_read->threads_count_lock);   \
273             if (x->cond_read->threads_count > 0)                       \
274               ReleaseSemaphore(x->cond_read->semaphore, 1, 0);         \
275             LeaveCriticalSection(&x->cond_read->threads_count_lock);   \
276           }                                                            \
277         else if (x->readers_count > 0)                                 \
278           CDB(x->cond_read);                                           \
279         else if (x->writers_count > 0)                                 \
280           {                                                            \
281             EnterCriticalSection (&x->cond_write->threads_count_lock); \
282             if (x->cond_write->threads_count > 0)                      \
283               ReleaseSemaphore(x->cond_write->semaphore, 1, 0);        \
284             LeaveCriticalSection (&x->cond_write->threads_count_lock); \
285           }                                                            \
286       }                                                                \
287     else if (x->readers > 0)                                           \
288       {                                                                \
289         x->readers--;                                                  \
290         if (x->readers == 0 && x->writers_count > 0)                   \
291           {                                                            \
292             EnterCriticalSection (&x->cond_write->threads_count_lock); \
293             if (x->cond_write->threads_count > 0)                      \
294               ReleaseSemaphore(x->cond_write->semaphore, 1, 0);        \
295             LeaveCriticalSection (&x->cond_write->threads_count_lock); \
296           }                                                            \
297       }                                                                \
298     LKU(x->mutex);                                                     \
299   } while (0)
300
301 # endif
302
303 #endif
304
305 typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
306 typedef struct _Ecore_Pthread Ecore_Pthread;
307 typedef struct _Ecore_Thread_Data  Ecore_Thread_Data;
308
309 struct _Ecore_Thread_Data
310 {
311    void *data;
312    Eina_Free_Cb cb;
313 };
314
315 struct _Ecore_Pthread_Worker
316 {
317    union {
318       struct {
319          Ecore_Thread_Cb func_blocking;
320       } short_run;
321       struct {
322          Ecore_Thread_Cb func_heavy;
323          Ecore_Thread_Notify_Cb func_notify;
324          Ecore_Pipe *notify;
325
326          Ecore_Pipe *direct_pipe;
327          Ecore_Pthread_Worker *direct_worker;
328
329          int send;
330          int received;
331       } feedback_run;
332    } u;
333
334    Ecore_Thread_Cb func_cancel;
335    Ecore_Thread_Cb func_end;
336 #ifdef EFL_HAVE_THREADS
337    PH(self);
338    Eina_Hash *hash;
339    CD(cond);
340    LK(mutex);
341 #endif
342
343    const void *data;
344
345    Eina_Bool cancel : 1;
346    Eina_Bool feedback_run : 1;
347    Eina_Bool kill : 1;
348    Eina_Bool reschedule : 1;
349    Eina_Bool no_queue : 1;
350 };
351
352 #ifdef EFL_HAVE_THREADS
353 typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
354
355 struct _Ecore_Pthread_Data
356 {
357    Ecore_Pthread_Worker *death_job;
358    Ecore_Pipe *p;
359    void *data;
360    PH(thread);
361 };
362 #endif
363
364 static void _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte);
365
366 static int _ecore_thread_count_max = 0;
367 static int ECORE_THREAD_PIPE_DEL = 0;
368 static Eina_Array *_ecore_thread_pipe = NULL;
369
370 static Ecore_Pipe*
371 _ecore_thread_pipe_get(void)
372 {
373    if (eina_array_count_get(_ecore_thread_pipe) > 0)
374      return eina_array_pop(_ecore_thread_pipe);
375
376    return ecore_pipe_add(_ecore_thread_handler, NULL);
377 }
378
379 #ifdef EFL_HAVE_THREADS
380 static int _ecore_thread_count = 0;
381
382 static Ecore_Event_Handler *del_handler = NULL;
383 static Eina_List *_ecore_active_job_threads = NULL;
384 static Eina_List *_ecore_pending_job_threads = NULL;
385 static Eina_List *_ecore_pending_job_threads_feedback = NULL;
386 static LK(_ecore_pending_job_threads_mutex);
387
388 static Eina_Hash *_ecore_thread_global_hash = NULL;
389 static LRWK(_ecore_thread_global_hash_lock);
390 static LK(_ecore_thread_global_hash_mutex);
391 static CD(_ecore_thread_global_hash_cond);
392
393 static LK(_ecore_main_loop_mutex);
394 static Eina_Bool have_main_loop_thread = 0;
395
396 static Eina_Trash *_ecore_thread_worker_trash = NULL;
397 static int _ecore_thread_worker_count = 0;
398
399 static void *_ecore_thread_worker(Ecore_Pthread_Data *pth);
400 static Ecore_Pthread_Worker *_ecore_thread_worker_new(void);
401
402 static PH(get_main_loop_thread)(void)
403 {
404   static PH(main_loop_thread);
405   static pid_t main_loop_pid;
406   pid_t pid = getpid();
407
408   if (pid != main_loop_pid)
409     {
410        main_loop_pid = pid;
411        main_loop_thread = PHS();
412        have_main_loop_thread = 1;
413     }
414
415   return main_loop_thread;
416 }
417
418 static void
419 _ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
420 {
421    if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16)
422      {
423         free(worker);
424         return ;
425      }
426
427    eina_trash_push(&_ecore_thread_worker_trash, worker);
428 }
429
430 static void
431 _ecore_thread_data_free(void *data)
432 {
433    Ecore_Thread_Data *d = data;
434
435    if (d->cb) d->cb(d->data);
436    free(d);
437 }
438
439 static void
440 _ecore_thread_pipe_free(void *data __UNUSED__, void *event)
441 {
442    Ecore_Pipe *p = event;
443
444    if (eina_array_count_get(_ecore_thread_pipe) < 50)
445      eina_array_push(_ecore_thread_pipe, p);
446    else
447      ecore_pipe_del(p);
448    eina_threads_shutdown();
449 }
450
451 static Eina_Bool
452 _ecore_thread_pipe_del(void *data __UNUSED__, int type __UNUSED__, void *event __UNUSED__)
453 {
454    /* This is a hack to delay pipe destruction until we are out of its internal loop. */
455    return ECORE_CALLBACK_CANCEL;
456 }
457
458 static void
459 _ecore_thread_end(Ecore_Pthread_Data *pth, Ecore_Thread *work)
460 {
461    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) work;
462    Ecore_Pipe *p;
463
464    if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
465      _ecore_thread_count--;
466
467    if (PHJ(pth->thread, p) != 0)
468      return ;
469
470    if (eina_list_count(_ecore_pending_job_threads) > 0
471        && (unsigned int) _ecore_thread_count < eina_list_count(_ecore_pending_job_threads)
472        && _ecore_thread_count < _ecore_thread_count_max)
473      {
474         /* One more thread should be created. */
475         INF("spawning threads because of still pending jobs.");
476
477         pth->death_job = _ecore_thread_worker_new();
478         if (!pth->p || !pth->death_job) goto end;
479
480         eina_threads_init();
481
482         if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
483           {
484              _ecore_thread_count++;
485              return ;
486           }
487
488         eina_threads_shutdown();
489
490      end:
491         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
492      }
493
494    _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
495
496    ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
497    free(pth);
498 }
499
500 static void
501 _ecore_thread_kill(Ecore_Pthread_Worker *work)
502 {
503    if (work->cancel)
504      {
505         if (work->func_cancel)
506           work->func_cancel((void *) work->data, (Ecore_Thread *) work);
507      }
508    else
509      {
510         if (work->func_end)
511           work->func_end((void *) work->data, (Ecore_Thread *) work);
512      }
513
514    if (work->feedback_run)
515      {
516         ecore_pipe_del(work->u.feedback_run.notify);
517
518         if (work->u.feedback_run.direct_pipe)
519           eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
520         if (work->u.feedback_run.direct_worker)
521           _ecore_thread_worker_free(work->u.feedback_run.direct_worker);
522      }
523    CDD(work->cond);
524    LKD(work->mutex);
525    if (work->hash)
526      eina_hash_free(work->hash);
527    free(work);
528 }
529
530 static void
531 _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
532 {
533    Ecore_Pthread_Worker *work;
534
535    if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
536
537    work = *(Ecore_Pthread_Worker **)buffer;
538
539    if (work->feedback_run)
540      {
541         if (work->u.feedback_run.send != work->u.feedback_run.received)
542           {
543              work->kill = EINA_TRUE;
544              return ;
545           }
546      }
547
548    _ecore_thread_kill(work);
549 }
550
551 static void
552 _ecore_notify_handler(void *data, void *buffer, unsigned int nbyte)
553 {
554    Ecore_Pthread_Worker *work = data;
555    void *user_data;
556
557    if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
558
559    user_data = *(void **)buffer;
560    work->u.feedback_run.received++;
561
562    if (work->u.feedback_run.func_notify)
563      work->u.feedback_run.func_notify((void *) work->data, (Ecore_Thread *) work, user_data);
564
565    /* Force reading all notify event before killing the thread */
566    if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
567      {
568         _ecore_thread_kill(work);
569      }
570 }
571
572 static void
573 _ecore_short_job(Ecore_Pipe *end_pipe)
574 {
575    Ecore_Pthread_Worker *work;
576
577    while (_ecore_pending_job_threads)
578      {
579         LKL(_ecore_pending_job_threads_mutex);
580
581         if (!_ecore_pending_job_threads)
582           {
583              LKU(_ecore_pending_job_threads_mutex);
584              break;
585           }
586
587         work = eina_list_data_get(_ecore_pending_job_threads);
588         _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
589                                                            _ecore_pending_job_threads);
590
591         LKU(_ecore_pending_job_threads_mutex);
592
593         if (!work->cancel)
594           work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
595
596         if (work->reschedule)
597           {
598              work->reschedule = EINA_FALSE;
599
600              LKL(_ecore_pending_job_threads_mutex);
601              _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
602              LKU(_ecore_pending_job_threads_mutex);
603           }
604         else
605           {
606              ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
607           }
608      }
609 }
610
611 static void
612 _ecore_feedback_job(Ecore_Pipe *end_pipe, PH(thread))
613 {
614    Ecore_Pthread_Worker *work;
615
616    while (_ecore_pending_job_threads_feedback)
617      {
618         LKL(_ecore_pending_job_threads_mutex);
619
620         if (!_ecore_pending_job_threads_feedback)
621           {
622              LKU(_ecore_pending_job_threads_mutex);
623              break;
624           }
625
626         work = eina_list_data_get(_ecore_pending_job_threads_feedback);
627         _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
628                                                                     _ecore_pending_job_threads_feedback);
629
630         LKU(_ecore_pending_job_threads_mutex);
631
632         work->self = thread;
633         if (!work->cancel)
634           work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
635
636         if (work->reschedule)
637           {
638              work->reschedule = EINA_FALSE;
639
640              LKL(_ecore_pending_job_threads_mutex);
641              _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
642              LKU(_ecore_pending_job_threads_mutex);
643           }
644         else
645           {
646              ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
647           }
648      }
649 }
650
651 static void *
652 _ecore_direct_worker(Ecore_Pthread_Worker *work)
653 {
654    Ecore_Pthread_Data *pth;
655
656 #ifdef EFL_POSIX_THREADS
657    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
658    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
659 #endif
660
661    eina_sched_prio_drop();
662
663    pth = malloc(sizeof (Ecore_Pthread_Data));
664    if (!pth) return NULL;
665
666    pth->p = work->u.feedback_run.direct_pipe;
667    if (!pth->p)
668      {
669         free(pth);
670         return NULL;
671      }
672    pth->thread = PHS();
673
674    work->self = pth->thread;
675    work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
676
677    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
678
679    work = work->u.feedback_run.direct_worker;
680    if (!work)
681      {
682         free(pth);
683         return NULL;
684      }
685
686    work->data = pth;
687    work->u.short_run.func_blocking = NULL;
688    work->func_end = (void *) _ecore_thread_end;
689    work->func_cancel = NULL;
690    work->cancel = EINA_FALSE;
691    work->feedback_run = EINA_FALSE;
692    work->kill = EINA_FALSE;
693    work->hash = NULL;
694    CDI(work->cond);
695    LKI(work->mutex);
696
697    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
698
699    return pth->p;
700 }
701
702 static void *
703 _ecore_thread_worker(Ecore_Pthread_Data *pth)
704 {
705    Ecore_Pthread_Worker *work;
706
707 #ifdef EFL_POSIX_THREADS
708    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
709    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
710 #endif
711
712    eina_sched_prio_drop();
713
714  restart:
715    if (_ecore_pending_job_threads) _ecore_short_job(pth->p);
716    if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
717
718    /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
719
720    LKL(_ecore_pending_job_threads_mutex);
721    if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
722      {
723         LKU(_ecore_pending_job_threads_mutex);
724         goto restart;
725      }
726    LKU(_ecore_pending_job_threads_mutex);
727
728    /* Sleep a little to prevent premature death */
729 #ifdef _WIN32
730    Sleep(1); /* around 50ms */
731 #else
732    usleep(200);
733 #endif
734
735    LKL(_ecore_pending_job_threads_mutex);
736    if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
737      {
738         LKU(_ecore_pending_job_threads_mutex);
739         goto restart;
740      }
741    LKU(_ecore_pending_job_threads_mutex);
742
743    work = pth->death_job;
744    if (!work) return NULL;
745
746    work->data = pth;
747    work->u.short_run.func_blocking = NULL;
748    work->func_end = (void *) _ecore_thread_end;
749    work->func_cancel = NULL;
750    work->cancel = EINA_FALSE;
751    work->feedback_run = EINA_FALSE;
752    work->kill = EINA_FALSE;
753    work->hash = NULL;
754    CDI(work->cond);
755    LKI(work->mutex);
756
757    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
758
759    return pth->p;
760 }
761
762 #endif
763
764 static Ecore_Pthread_Worker *
765 _ecore_thread_worker_new(void)
766 {
767    Ecore_Pthread_Worker *result;
768
769 #ifdef EFL_HAVE_THREADS
770    result = eina_trash_pop(&_ecore_thread_worker_trash);
771
772    if (!result) result = malloc(sizeof (Ecore_Pthread_Worker));
773    else _ecore_thread_worker_count--;
774
775    return result;
776 #else
777    return malloc(sizeof (Ecore_Pthread_Worker));
778 #endif
779 }
780
781 void
782 _ecore_thread_init(void)
783 {
784    _ecore_thread_count_max = eina_cpu_count();
785    if (_ecore_thread_count_max <= 0)
786      _ecore_thread_count_max = 1;
787
788    ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
789    _ecore_thread_pipe = eina_array_new(8);
790
791 #ifdef EFL_HAVE_THREADS
792    del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
793
794    LKI(_ecore_pending_job_threads_mutex);
795    LRWKI(_ecore_thread_global_hash_lock);
796    LKI(_ecore_thread_global_hash_mutex);
797    LKI(_ecore_main_loop_mutex);
798    CDI(_ecore_thread_global_hash_cond);
799 #endif
800 }
801
802 void
803 _ecore_thread_shutdown(void)
804 {
805    /* FIXME: If function are still running in the background, should we kill them ? */
806    Ecore_Pipe *p;
807    Eina_Array_Iterator it;
808    unsigned int i;
809
810 #ifdef EFL_HAVE_THREADS
811    Ecore_Pthread_Worker *work;
812    Ecore_Pthread_Data *pth;
813
814    LKL(_ecore_pending_job_threads_mutex);
815
816    EINA_LIST_FREE(_ecore_pending_job_threads, work)
817      {
818         if (work->func_cancel)
819           work->func_cancel((void *)work->data, (Ecore_Thread *) work);
820         free(work);
821      }
822
823    EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
824      {
825         if (work->func_cancel)
826           work->func_cancel((void *)work->data, (Ecore_Thread *) work);
827         free(work);
828      }
829
830    LKU(_ecore_pending_job_threads_mutex);
831
832    /* Improve emergency shutdown */
833    EINA_LIST_FREE(_ecore_active_job_threads, pth)
834      {
835         Ecore_Pipe *ep;
836
837         PHA(pth->thread);
838         PHJ(pth->thread, ep);
839
840         ecore_pipe_del(pth->p);
841      }
842    if (_ecore_thread_global_hash)
843      eina_hash_free(_ecore_thread_global_hash);
844    ecore_event_handler_del(del_handler);
845    have_main_loop_thread = 0;
846    del_handler = NULL;
847
848    LKD(_ecore_pending_job_threads_mutex);
849    LRWKD(_ecore_thread_global_hash_lock);
850    LKD(_ecore_thread_global_hash_mutex);
851    CDD(_ecore_thread_global_hash_cond);
852 #endif
853
854    EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
855      ecore_pipe_del(p);
856
857    eina_array_free(_ecore_thread_pipe);
858    _ecore_thread_pipe = NULL;
859 }
860
861 void
862 _ecore_thread_assert_main_loop_thread(const char *function)
863 {
864    Eina_Bool good;
865 #ifdef EFL_HAVE_THREADS
866    good = PHE(get_main_loop_thread(), PHS());
867 #else
868    good = EINA_TRUE;
869 #endif
870    if (!good)
871      {
872         EINA_LOG_CRIT("Call to %s from wrong thread!", function);
873 #if 0
874         abort();
875 #endif
876      }
877 }
878
879 #ifdef HAVE_THREAD_SAFETY
880 static int lock_count;
881
882 void
883 _ecore_lock(void)
884 {
885   LKL(_ecore_main_loop_mutex);
886   lock_count++;
887   assert(lock_count == 1);
888 }
889
890 void
891 _ecore_unlock(void)
892 {
893   lock_count--;
894   assert(lock_count == 0);
895   LKU(_ecore_main_loop_mutex);
896 }
897 #endif
898
899 EAPI Ecore_Thread *
900 ecore_thread_run(Ecore_Thread_Cb func_blocking,
901                  Ecore_Thread_Cb func_end,
902                  Ecore_Thread_Cb func_cancel,
903                  const void *data)
904 {
905    Ecore_Pthread_Worker *work;
906 #ifdef EFL_HAVE_THREADS
907    Ecore_Pthread_Data *pth = NULL;
908 #endif
909
910    if (!func_blocking) return NULL;
911
912    work = _ecore_thread_worker_new();
913    if (!work)
914      {
915         if (func_cancel)
916           func_cancel((void *) data, NULL);
917         return NULL;
918      }
919
920    work->u.short_run.func_blocking = func_blocking;
921    work->func_end = func_end;
922    work->func_cancel = func_cancel;
923    work->cancel = EINA_FALSE;
924    work->feedback_run = EINA_FALSE;
925    work->kill = EINA_FALSE;
926    work->reschedule = EINA_FALSE;
927    work->data = data;
928
929 #ifdef EFL_HAVE_THREADS
930    work->hash = NULL;
931    CDI(work->cond);
932    LKI(work->mutex);
933
934    LKL(_ecore_pending_job_threads_mutex);
935    _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
936
937    if (_ecore_thread_count == _ecore_thread_count_max)
938      {
939         LKU(_ecore_pending_job_threads_mutex);
940         return (Ecore_Thread *) work;
941      }
942
943    LKU(_ecore_pending_job_threads_mutex);
944
945    /* One more thread could be created. */
946    pth = malloc(sizeof (Ecore_Pthread_Data));
947    if (!pth) goto on_error;
948
949    pth->p = _ecore_thread_pipe_get();
950    pth->death_job = _ecore_thread_worker_new();
951    if (!pth->p || !pth->death_job) goto on_error;
952
953    eina_threads_init();
954
955    if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
956      {
957         _ecore_thread_count++;
958         return (Ecore_Thread *) work;
959      }
960
961    eina_threads_shutdown();
962
963  on_error:
964    if (pth)
965      {
966         if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
967         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
968         free(pth);
969      }
970
971    if (_ecore_thread_count == 0)
972      {
973         LKL(_ecore_pending_job_threads_mutex);
974         _ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
975         LKU(_ecore_pending_job_threads_mutex);
976
977         if (work->func_cancel)
978           work->func_cancel((void *) work->data, (Ecore_Thread *) work);
979         free(work);
980         work = NULL;
981      }
982    return (Ecore_Thread *) work;
983 #else
984    /*
985      If no thread and as we don't want to break app that rely on this
986      facility, we will lock the interface until we are done.
987     */
988    do {
989       /* Handle reschedule by forcing it here. That would mean locking the app,
990        * would be better with an idler, but really to complex for a case where
991        * thread should really exist.
992        */
993       work->reschedule = EINA_FALSE;
994
995       func_blocking((void *)data, (Ecore_Thread *) work);
996       if (work->cancel == EINA_FALSE) func_end((void *)data, (Ecore_Thread *) work);
997       else func_cancel((void *)data, (Ecore_Thread *) work);
998
999    } while (work->reschedule == EINA_TRUE);
1000
1001    free(work);
1002
1003    return NULL;
1004 #endif
1005 }
1006
1007 EAPI Eina_Bool
1008 ecore_thread_cancel(Ecore_Thread *thread)
1009 {
1010 #ifdef EFL_HAVE_THREADS
1011    Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
1012    Eina_List *l;
1013
1014    if (!work)
1015      return EINA_TRUE;
1016    if (work->cancel)
1017      return EINA_FALSE;
1018
1019    if (work->feedback_run)
1020      {
1021         if (work->kill)
1022           return EINA_TRUE;
1023         if (work->u.feedback_run.send != work->u.feedback_run.received)
1024           goto on_exit;
1025      }
1026
1027    LKL(_ecore_pending_job_threads_mutex);
1028
1029    if ((have_main_loop_thread) &&
1030        (PHE(get_main_loop_thread(), PHS())))
1031      {
1032         if (!work->feedback_run)
1033           EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
1034             {
1035                if ((void *) work == (void *) thread)
1036                  {
1037                     _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
1038
1039                     LKU(_ecore_pending_job_threads_mutex);
1040
1041                     if (work->func_cancel)
1042                       work->func_cancel((void *) work->data, (Ecore_Thread *) work);
1043                     free(work);
1044
1045                     return EINA_TRUE;
1046                  }
1047             }
1048         else
1049           EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
1050             {
1051                if ((void *) work == (void *) thread)
1052                  {
1053                     _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
1054
1055                     LKU(_ecore_pending_job_threads_mutex);
1056
1057                     if (work->func_cancel)
1058                       work->func_cancel((void *) work->data, (Ecore_Thread *) work);
1059                     free(work);
1060
1061                     return EINA_TRUE;
1062                  }
1063             }
1064      }
1065
1066    LKU(_ecore_pending_job_threads_mutex);
1067
1068    /* Delay the destruction */
1069  on_exit:
1070    ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
1071    return EINA_FALSE;
1072 #else
1073    return EINA_TRUE;
1074 #endif
1075 }
1076
1077 EAPI Eina_Bool
1078 ecore_thread_check(Ecore_Thread *thread)
1079 {
1080    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1081
1082    if (!worker) return EINA_TRUE;
1083    return worker->cancel;
1084 }
1085
1086 EAPI Ecore_Thread *ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1087                                              Ecore_Thread_Notify_Cb func_notify,
1088                                              Ecore_Thread_Cb func_end,
1089                                              Ecore_Thread_Cb func_cancel,
1090                                              const void *data,
1091                                              Eina_Bool try_no_queue)
1092 {
1093
1094 #ifdef EFL_HAVE_THREADS
1095    Ecore_Pthread_Worker *worker;
1096    Ecore_Pthread_Data *pth = NULL;
1097
1098    if (!func_heavy) return NULL;
1099
1100    worker = _ecore_thread_worker_new();
1101    if (!worker) goto on_error;
1102
1103    worker->u.feedback_run.func_heavy = func_heavy;
1104    worker->u.feedback_run.func_notify = func_notify;
1105    worker->hash = NULL;
1106    CDI(worker->cond);
1107    LKI(worker->mutex);
1108    worker->func_cancel = func_cancel;
1109    worker->func_end = func_end;
1110    worker->data = data;
1111    worker->cancel = EINA_FALSE;
1112    worker->feedback_run = EINA_TRUE;
1113    worker->kill = EINA_FALSE;
1114    worker->reschedule = EINA_FALSE;
1115
1116    worker->u.feedback_run.send = 0;
1117    worker->u.feedback_run.received = 0;
1118
1119    worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
1120    worker->u.feedback_run.direct_pipe = NULL;
1121    worker->u.feedback_run.direct_worker = NULL;
1122
1123    if (!try_no_queue)
1124      {
1125         PH(t);
1126
1127         worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
1128         worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
1129         worker->no_queue = EINA_TRUE;
1130
1131         eina_threads_init();
1132
1133         if (PHC(t, _ecore_direct_worker, worker) == 0)
1134            return (Ecore_Thread *) worker;
1135
1136         eina_threads_shutdown();
1137      }
1138
1139    worker->no_queue = EINA_FALSE;
1140
1141    LKL(_ecore_pending_job_threads_mutex);
1142    _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, worker);
1143
1144    if (_ecore_thread_count == _ecore_thread_count_max)
1145      {
1146         LKU(_ecore_pending_job_threads_mutex);
1147         return (Ecore_Thread *) worker;
1148      }
1149
1150    LKU(_ecore_pending_job_threads_mutex);
1151
1152    /* One more thread could be created. */
1153    pth = malloc(sizeof (Ecore_Pthread_Data));
1154    if (!pth) goto on_error;
1155
1156    pth->p = _ecore_thread_pipe_get();
1157    pth->death_job = _ecore_thread_worker_new();
1158    if (!pth->p || !pth->death_job) goto on_error;
1159
1160    eina_threads_init();
1161
1162    if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
1163      {
1164         _ecore_thread_count++;
1165         return (Ecore_Thread *) worker;
1166      }
1167
1168    eina_threads_shutdown();
1169
1170  on_error:
1171    if (pth)
1172      {
1173         if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
1174         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
1175         free(pth);
1176      }
1177
1178    if (_ecore_thread_count == 0)
1179      {
1180         LKL(_ecore_pending_job_threads_mutex);
1181         _ecore_pending_job_threads_feedback = eina_list_remove(_ecore_pending_job_threads_feedback,
1182                                                                worker);
1183         LKU(_ecore_pending_job_threads_mutex);
1184
1185         if (func_cancel) func_cancel((void *) data, NULL);
1186
1187         if (worker)
1188           {
1189              ecore_pipe_del(worker->u.feedback_run.notify);
1190              free(worker);
1191              worker = NULL;
1192           }
1193      }
1194
1195    return (Ecore_Thread *) worker;
1196 #else
1197    Ecore_Pthread_Worker worker;
1198
1199    (void) try_no_queue;
1200
1201    /*
1202      If no thread and as we don't want to break app that rely on this
1203      facility, we will lock the interface until we are done.
1204     */
1205    worker.u.feedback_run.func_heavy = func_heavy;
1206    worker.u.feedback_run.func_notify = func_notify;
1207    worker.u.feedback_run.notify = NULL;
1208    worker.u.feedback_run.send = 0;
1209    worker.u.feedback_run.received = 0;
1210    worker.func_cancel = func_cancel;
1211    worker.func_end = func_end;
1212    worker.data = data;
1213    worker.cancel = EINA_FALSE;
1214    worker.feedback_run = EINA_TRUE;
1215    worker.kill = EINA_FALSE;
1216
1217    do {
1218       worker.reschedule = EINA_FALSE;
1219
1220       func_heavy((void *)data, (Ecore_Thread *) &worker);
1221
1222       if (worker.cancel) func_cancel((void *)data, (Ecore_Thread *) &worker);
1223       else func_end((void *)data, (Ecore_Thread *) &worker);
1224    } while (worker.reschedule == EINA_TRUE);
1225
1226    return NULL;
1227 #endif
1228 }
1229
1230 EAPI Eina_Bool
1231 ecore_thread_feedback(Ecore_Thread *thread, const void *data)
1232 {
1233    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1234
1235    if (!worker) return EINA_FALSE;
1236    if (!worker->feedback_run) return EINA_FALSE;
1237
1238 #ifdef EFL_HAVE_THREADS
1239    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1240
1241    worker->u.feedback_run.send++;
1242    ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
1243
1244    return EINA_TRUE;
1245 #else
1246    worker->u.feedback_run.func_notify((void*) worker->data, thread, (void*) data);
1247
1248    return EINA_TRUE;
1249 #endif
1250 }
1251
1252 EAPI Eina_Bool
1253 ecore_thread_reschedule(Ecore_Thread *thread)
1254 {
1255    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1256
1257    if (!worker) return EINA_FALSE;
1258
1259 #ifdef EFL_HAVE_THREADS
1260    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1261 #endif
1262
1263    worker->reschedule = EINA_TRUE;
1264    return EINA_TRUE;
1265 }
1266
1267 EAPI int
1268 ecore_thread_active_get(void)
1269 {
1270 #ifdef EFL_HAVE_THREADS
1271    return _ecore_thread_count;
1272 #else
1273    return 0;
1274 #endif
1275 }
1276
1277 EAPI int
1278 ecore_thread_pending_get(void)
1279 {
1280    int ret;
1281 #ifdef EFL_HAVE_THREADS
1282    LKL(_ecore_pending_job_threads_mutex);
1283    ret = eina_list_count(_ecore_pending_job_threads);
1284    LKU(_ecore_pending_job_threads_mutex);
1285    return ret;
1286 #else
1287    return 0;
1288 #endif
1289 }
1290
1291 EAPI int
1292 ecore_thread_pending_feedback_get(void)
1293 {
1294    int ret;
1295 #ifdef EFL_HAVE_THREADS
1296    LKL(_ecore_pending_job_threads_mutex);
1297    ret = eina_list_count(_ecore_pending_job_threads_feedback);
1298    LKU(_ecore_pending_job_threads_mutex);
1299    return ret;
1300 #else
1301    return 0;
1302 #endif
1303 }
1304
1305 EAPI int
1306 ecore_thread_pending_total_get(void)
1307 {
1308    int ret;
1309 #ifdef EFL_HAVE_THREADS
1310    LKL(_ecore_pending_job_threads_mutex);
1311    ret = eina_list_count(_ecore_pending_job_threads) + eina_list_count(_ecore_pending_job_threads_feedback);
1312    LKU(_ecore_pending_job_threads_mutex);
1313    return ret;
1314 #else
1315    return 0;
1316 #endif
1317 }
1318
1319 EAPI int
1320 ecore_thread_max_get(void)
1321 {
1322    return _ecore_thread_count_max;
1323 }
1324
1325 EAPI void
1326 ecore_thread_max_set(int num)
1327 {
1328    if (num < 1) return;
1329    /* avoid doing something hilarious by blocking dumb users */
1330    if (num >= (2 * eina_cpu_count())) return;
1331
1332    _ecore_thread_count_max = num;
1333 }
1334
1335 EAPI void
1336 ecore_thread_max_reset(void)
1337 {
1338    _ecore_thread_count_max = eina_cpu_count();
1339 }
1340
1341 EAPI int
1342 ecore_thread_available_get(void)
1343 {
1344    int ret;
1345 #ifdef EFL_HAVE_THREADS
1346    LKL(_ecore_pending_job_threads_mutex);
1347    ret = _ecore_thread_count_max - _ecore_thread_count;
1348    LKU(_ecore_pending_job_threads_mutex);
1349    return ret;
1350 #else
1351    return 0;
1352 #endif
1353 }
1354
1355 EAPI Eina_Bool
1356 ecore_thread_local_data_add(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1357 {
1358    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1359    Ecore_Thread_Data *d;
1360    Eina_Bool ret;
1361
1362    if ((!thread) || (!key) || (!value))
1363      return EINA_FALSE;
1364 #ifdef EFL_HAVE_THREADS
1365    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1366
1367    if (!worker->hash)
1368      worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1369
1370    if (!worker->hash)
1371      return EINA_FALSE;
1372
1373    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1374      return EINA_FALSE;
1375
1376    d->data = value;
1377    d->cb = cb;
1378
1379    if (direct)
1380      ret = eina_hash_direct_add(worker->hash, key, d);
1381    else
1382      ret = eina_hash_add(worker->hash, key, d);
1383    CDB(worker->cond);
1384    return ret;
1385 #else
1386    return EINA_TRUE;
1387 #endif
1388 }
1389
1390 EAPI void *
1391 ecore_thread_local_data_set(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb)
1392 {
1393    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1394    Ecore_Thread_Data *d, *r;
1395    void *ret;
1396    if ((!thread) || (!key) || (!value))
1397      return NULL;
1398 #ifdef EFL_HAVE_THREADS
1399    if (!PHE(worker->self, PHS())) return NULL;
1400
1401    if (!worker->hash)
1402      worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1403
1404    if (!worker->hash)
1405      return NULL;
1406
1407    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1408      return NULL;
1409
1410    d->data = value;
1411    d->cb = cb;
1412
1413    r = eina_hash_set(worker->hash, key, d);
1414    CDB(worker->cond);
1415    ret = r->data;
1416    free(r);
1417    return ret;
1418 #else
1419    return NULL;
1420 #endif
1421 }
1422
1423
1424 EAPI void *
1425 ecore_thread_local_data_find(Ecore_Thread *thread, const char *key)
1426 {
1427    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1428    Ecore_Thread_Data *d;
1429
1430    if ((!thread) || (!key))
1431      return NULL;
1432 #ifdef EFL_HAVE_THREADS
1433    if (!PHE(worker->self, PHS())) return NULL;
1434
1435    if (!worker->hash)
1436      return NULL;
1437
1438    d = eina_hash_find(worker->hash, key);
1439    if (d)
1440      return d->data;
1441    return NULL;
1442 #else
1443    return NULL;
1444 #endif
1445 }
1446
1447 EAPI Eina_Bool
1448 ecore_thread_local_data_del(Ecore_Thread *thread, const char *key)
1449 {
1450    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1451    if ((!thread) || (!key))
1452      return EINA_FALSE;
1453 #ifdef EFL_HAVE_THREADS
1454    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1455
1456    if (!worker->hash)
1457      return EINA_FALSE;
1458    return eina_hash_del_by_key(worker->hash, key);
1459 #else
1460    return EINA_TRUE;
1461 #endif
1462 }
1463
1464 EAPI Eina_Bool
1465 ecore_thread_global_data_add(const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1466 {
1467    Eina_Bool ret;
1468    Ecore_Thread_Data *d;
1469
1470    if ((!key) || (!value))
1471      return EINA_FALSE;
1472 #ifdef EFL_HAVE_THREADS
1473    LRWKWL(_ecore_thread_global_hash_lock);
1474    if (!_ecore_thread_global_hash)
1475      _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1476    LRWKU(_ecore_thread_global_hash_lock);
1477
1478    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1479      return EINA_FALSE;
1480
1481    d->data = value;
1482    d->cb = cb;
1483
1484    if (!_ecore_thread_global_hash)
1485      return EINA_FALSE;
1486    LRWKWL(_ecore_thread_global_hash_lock);
1487    if (direct)
1488      ret = eina_hash_direct_add(_ecore_thread_global_hash, key, d);
1489    else
1490      ret = eina_hash_add(_ecore_thread_global_hash, key, d);
1491    LRWKU(_ecore_thread_global_hash_lock);
1492    CDB(_ecore_thread_global_hash_cond);
1493    return ret;
1494 #else
1495    return EINA_TRUE;
1496 #endif
1497 }
1498
1499 EAPI void *
1500 ecore_thread_global_data_set(const char *key, void *value, Eina_Free_Cb cb)
1501 {
1502    Ecore_Thread_Data *d, *r;
1503    void *ret;
1504
1505    if ((!key) || (!value))
1506      return NULL;
1507 #ifdef EFL_HAVE_THREADS
1508    LRWKWL(_ecore_thread_global_hash_lock);
1509    if (!_ecore_thread_global_hash)
1510      _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1511    LRWKU(_ecore_thread_global_hash_lock);
1512
1513    if (!_ecore_thread_global_hash)
1514      return NULL;
1515
1516    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1517      return NULL;
1518
1519    d->data = value;
1520    d->cb = cb;
1521
1522    LRWKWL(_ecore_thread_global_hash_lock);
1523    r = eina_hash_set(_ecore_thread_global_hash, key, d);
1524    LRWKU(_ecore_thread_global_hash_lock);
1525    CDB(_ecore_thread_global_hash_cond);
1526
1527    ret = r->data;
1528    free(r);
1529    return ret;
1530 #else
1531    return NULL;
1532 #endif
1533 }
1534
1535
1536 EAPI void *
1537 ecore_thread_global_data_find(const char *key)
1538 {
1539    Ecore_Thread_Data *ret;
1540    if (!key)
1541      return NULL;
1542 #ifdef EFL_HAVE_THREADS
1543    if (!_ecore_thread_global_hash) return NULL;
1544
1545    LRWKRL(_ecore_thread_global_hash_lock);
1546    ret = eina_hash_find(_ecore_thread_global_hash, key);
1547    LRWKU(_ecore_thread_global_hash_lock);
1548    if (ret)
1549      return ret->data;
1550    return NULL;
1551 #else
1552    return NULL;
1553 #endif
1554 }
1555
1556 EAPI Eina_Bool
1557 ecore_thread_global_data_del(const char *key)
1558 {
1559    Eina_Bool ret;
1560
1561    if (!key)
1562      return EINA_FALSE;
1563 #ifdef EFL_HAVE_THREADS
1564    if (!_ecore_thread_global_hash)
1565      return EINA_FALSE;
1566
1567    LRWKWL(_ecore_thread_global_hash_lock);
1568    ret = eina_hash_del_by_key(_ecore_thread_global_hash, key);
1569    LRWKU(_ecore_thread_global_hash_lock);
1570    return ret;
1571 #else
1572    return EINA_TRUE;
1573 #endif
1574 }
1575
1576 EAPI void *
1577 ecore_thread_global_data_wait(const char *key, double seconds)
1578 {
1579    double tm = 0;
1580    Ecore_Thread_Data *ret = NULL;
1581
1582    if (!key)
1583      return NULL;
1584 #ifdef EFL_HAVE_THREADS
1585    if (!_ecore_thread_global_hash)
1586      return NULL;
1587    if (seconds > 0)
1588      tm = ecore_time_get() + seconds;
1589
1590    while (1)
1591      {
1592 #ifndef _WIN32
1593         struct timespec t = { 0, 0 };
1594
1595         t.tv_sec = (long int)tm;
1596         t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
1597 #else
1598         struct timeval t = { 0, 0 };
1599
1600         t.tv_sec = (long int)tm;
1601         t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
1602 #endif
1603         LRWKRL(_ecore_thread_global_hash_lock);
1604         ret = eina_hash_find(_ecore_thread_global_hash, key);
1605         LRWKU(_ecore_thread_global_hash_lock);
1606         if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
1607           break;
1608         LKL(_ecore_thread_global_hash_mutex);
1609         CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
1610         LKU(_ecore_thread_global_hash_mutex);
1611      }
1612    if (ret) return ret->data;
1613    return NULL;
1614 #else
1615    return NULL;
1616 #endif
1617 }