Merge branch 'upstream'
[profile/ivi/ecore.git] / src / lib / ecore / ecore_thread.c
1 #ifdef HAVE_CONFIG_H
2 # include <config.h>
3 #endif
4
5 #include <sys/time.h>
6
7 #ifdef HAVE_EVIL
8 # include <Evil.h>
9 #endif
10
11 #include "Ecore.h"
12 #include "ecore_private.h"
13
14 #ifdef EFL_HAVE_THREADS
15
16 # ifdef EFL_HAVE_POSIX_THREADS
17 #  include <pthread.h>
18 #  ifdef __linux__
19 #   include <sched.h>
20 #   include <sys/resource.h>
21 #   include <unistd.h>
22 #   include <sys/syscall.h>
23 #   include <errno.h>
24 #  endif
25
26 #  define PH(x)        pthread_t x
27 #  define PHE(x, y)    pthread_equal(x, y)
28 #  define PHS()        pthread_self()
29 #  define PHC(x, f, d) pthread_create(&(x), NULL, (void*) f, d)
30 #  define PHJ(x, p)    pthread_join(x, (void**)(&(p)))
31 #  define PHA(x)       pthread_cancel(x)
32
33 #  define CD(x)  pthread_cond_t x
34 #  define CDI(x) pthread_cond_init(&(x), NULL);
35 #  define CDD(x) pthread_cond_destroy(&(x));
36 #  define CDB(x) pthread_cond_broadcast(&(x));
37 #  define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
38
39 #  define LK(x)  pthread_mutex_t x
40 #  define LKI(x) pthread_mutex_init(&(x), NULL);
41 #  define LKD(x) pthread_mutex_destroy(&(x));
42 #  define LKL(x) pthread_mutex_lock(&(x));
43 #  define LKU(x) pthread_mutex_unlock(&(x));
44
45 #  define LRWK(x)   pthread_rwlock_t x
46 #  define LRWKI(x)  pthread_rwlock_init(&(x), NULL);
47 #  define LRWKD(x)  pthread_rwlock_destroy(&(x));
48 #  define LRWKWL(x) pthread_rwlock_wrlock(&(x));
49 #  define LRWKRL(x) pthread_rwlock_rdlock(&(x));
50 #  define LRWKU(x)  pthread_rwlock_unlock(&(x));
51
52 # else /* EFL_HAVE_WIN32_THREADS */
53
54 #  define WIN32_LEAN_AND_MEAN
55 #  include <windows.h>
56 #  undef WIN32_LEAN_AND_MEAN
57
58 typedef struct
59 {
60   HANDLE thread;
61   void *val;
62 } win32_thread;
63
64 #  define PH(x)        win32_thread *x
65 #  define PHE(x, y)    ((x) == (y))
66 #  define PHS()        (HANDLE)GetCurrentThreadId()
67
68 int _ecore_thread_win32_create(win32_thread **x, LPTHREAD_START_ROUTINE f, void *d)
69 {
70   win32_thread *t;
71   t = (win32_thread *)calloc(1, sizeof(win32_thread));
72   if (!t)
73     return -1;
74
75   (t)->thread = CreateThread(NULL, 0, f, d, 0, NULL);
76   if (!t->thread)
77     {
78       free(t);
79       return -1;
80     }
81   t->val = d;
82   *x = t;
83
84   return 0;
85 }
86 #  define PHC(x, f, d) _ecore_thread_win32_create(&(x), (LPTHREAD_START_ROUTINE)f, d)
87
88 int _ecore_thread_win32_join(win32_thread *x, void **res)
89 {
90   if (!PHE(x, PHS()))
91     {
92       WaitForSingleObject(x->thread, INFINITE);
93       CloseHandle(x->thread);
94     }
95   if (res) *res = x->val;
96   free(x);
97
98   return 0;
99 }
100
101 #  define PHJ(x, p) _ecore_thread_win32_join(x, (void**)(&(p)))
102 #  define PHA(x) TerminateThread(x->thread, 0)
103
104 #  define LK(x)  HANDLE x
105 #  define LKI(x) x = CreateMutex(NULL, FALSE, NULL)
106 #  define LKD(x) CloseHandle(x)
107 #  define LKL(x) WaitForSingleObject(x, INFINITE)
108 #  define LKU(x) ReleaseMutex(x)
109
110 typedef struct
111 {
112   HANDLE semaphore;
113   LONG threads_count;
114   CRITICAL_SECTION threads_count_lock;
115 } win32_cond;
116
117 #  define CD(x)  win32_cond *x
118
119 #  define CDI(x)                                                     \
120    do {                                                              \
121      x = (win32_cond *)calloc(1, sizeof(win32_cond));                \
122      if (x)                                                          \
123         {                                                            \
124           x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
125           if (x->semaphore)                                          \
126             InitializeCriticalSection(&x->threads_count_lock);     \
127           else                                                       \
128             {                                                        \
129               free(x);                                               \
130               x = NULL;                                              \
131             }                                                        \
132         }                                                            \
133    } while (0)
134
135 #  define CDD(x)               \
136   do {                         \
137     CloseHandle(x->semaphore); \
138     free(x);                   \
139     x = NULL;                  \
140    } while (0)
141
142 #  define CDB(x)                                            \
143 do {                                                        \
144   EnterCriticalSection(&x->threads_count_lock);             \
145   if (x->threads_count > 0)                                 \
146     ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \
147   LeaveCriticalSection (&x->threads_count_lock);            \
148  } while (0)
149
150 int _ecore_thread_win32_cond_timedwait(win32_cond *c, HANDLE *external_mutex, struct timeval *t)
151 {
152   DWORD res;
153   DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
154   LKL(external_mutex);
155   EnterCriticalSection (&c->threads_count_lock);
156   c->threads_count++;
157   LeaveCriticalSection (&c->threads_count_lock);
158   LKU(external_mutex);
159   res = WaitForSingleObject(c->semaphore, val);
160   if (res == WAIT_OBJECT_0)
161     return 0;
162   else
163     return -1;
164 }
165 #  define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
166
167 typedef struct
168 {
169   LONG readers_count;
170   LONG writers_count;
171   int readers;
172   int writers;
173   LK(mutex);
174   CD(cond_read);
175   CD(cond_write);
176 } win32_rwl;
177
178 #  define LRWK(x)   win32_rwl *x
179 #  define LRWKI(x)                                 \
180   do {                                             \
181     x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
182     if (x)                                         \
183       {                                            \
184         LKI(x->mutex);                             \
185         if (x->mutex)                              \
186           {                                        \
187             CDI(x->cond_read);                     \
188             if (x->cond_read)                      \
189               {                                    \
190                 CDI(x->cond_write);                \
191                 if (!x->cond_write)                \
192                   {                                \
193                     CDD(x->cond_read);             \
194                     LKD(x->mutex);                 \
195                     free(x);                       \
196                     x = NULL;                      \
197                   }                                \
198               }                                    \
199             else                                   \
200               {                                    \
201                 LKD(x->mutex);                     \
202                 free(x);                           \
203                 x = NULL;                          \
204               }                                    \
205           }                                        \
206         else                                       \
207           {                                        \
208             free(x);                               \
209             x = NULL;                              \
210           }                                        \
211       }                                            \
212   } while (0)
213
214 #  define LRWKD(x)                   \
215   do {                               \
216     LKU(x->mutex);                   \
217     LKD(x->mutex);                   \
218     CDD(x->cond_write);              \
219     CDD(x->cond_read);               \
220     free(x);                         \
221   } while (0)
222 #  define LRWKWL(x)                                                       \
223   do {                                                                    \
224     DWORD res;                                                            \
225     LKU(x->mutex);                                                        \
226     if (x->writers || x->readers > 0)                                     \
227       {                                                                   \
228         x->writers_count++;                                               \
229         while (x->writers || x->readers > 0)                              \
230           {                                                               \
231             EnterCriticalSection(&x->cond_write->threads_count_lock);     \
232             x->cond_read->threads_count++;                                \
233             LeaveCriticalSection(&x->cond_write->threads_count_lock);     \
234             res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
235             if (res != WAIT_OBJECT_0) break;                              \
236           }                                                               \
237         x->writers_count--;                                               \
238       }                                                                   \
239     if (res == 0) x->writers_count = 1;                                   \
240     LKU(x->mutex);                                                        \
241   } while (0)
242 #  define LRWKRL(x)                                                       \
243   do {                                                                    \
244     DWORD res;                                                            \
245     LKL(x->mutex);                                                        \
246     if (x->writers)                                                       \
247       {                                                                   \
248         x->readers_count++;                                               \
249         while (x->writers)                                                \
250           {                                                               \
251             EnterCriticalSection(&x->cond_write->threads_count_lock);     \
252             x->cond_read->threads_count++;                                \
253             LeaveCriticalSection(&x->cond_write->threads_count_lock);     \
254             res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
255             if (res != WAIT_OBJECT_0) break;                              \
256           }                                                               \
257         x->readers_count--;                                               \
258       }                                                                   \
259     if (res == 0)                                                         \
260       x->readers++;                                                       \
261     LKU(x->mutex);                                                        \
262   } while (0)
263 #  define LRWKU(x)                                                     \
264   do {                                                                 \
265     LKL(x->mutex);                                                     \
266     if (x->writers)                                                    \
267       {                                                                \
268         x->writers = 0;                                                \
269         if (x->readers_count == 1)                                     \
270           {                                                            \
271             EnterCriticalSection(&x->cond_read->threads_count_lock);   \
272             if (x->cond_read->threads_count > 0)                       \
273               ReleaseSemaphore(x->cond_read->semaphore, 1, 0);         \
274             LeaveCriticalSection(&x->cond_read->threads_count_lock);   \
275           }                                                            \
276         else if (x->readers_count > 0)                                 \
277           CDB(x->cond_read);                                           \
278         else if (x->writers_count > 0)                                 \
279           {                                                            \
280             EnterCriticalSection (&x->cond_write->threads_count_lock); \
281             if (x->cond_write->threads_count > 0)                      \
282               ReleaseSemaphore(x->cond_write->semaphore, 1, 0);        \
283             LeaveCriticalSection (&x->cond_write->threads_count_lock); \
284           }                                                            \
285       }                                                                \
286     else if (x->readers > 0)                                           \
287       {                                                                \
288         x->readers--;                                                  \
289         if (x->readers == 0 && x->writers_count > 0)                   \
290           {                                                            \
291             EnterCriticalSection (&x->cond_write->threads_count_lock); \
292             if (x->cond_write->threads_count > 0)                      \
293               ReleaseSemaphore(x->cond_write->semaphore, 1, 0);        \
294             LeaveCriticalSection (&x->cond_write->threads_count_lock); \
295           }                                                            \
296       }                                                                \
297     LKU(x->mutex);                                                     \
298   } while (0)
299
300 # endif
301
302 #endif
303
304 typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
305 typedef struct _Ecore_Pthread Ecore_Pthread;
306 typedef struct _Ecore_Thread_Data  Ecore_Thread_Data;
307
308 struct _Ecore_Thread_Data
309 {
310    void *data;
311    Eina_Free_Cb cb;
312 };
313
314 struct _Ecore_Pthread_Worker
315 {
316    union {
317       struct {
318          Ecore_Thread_Cb func_blocking;
319       } short_run;
320       struct {
321          Ecore_Thread_Cb func_heavy;
322          Ecore_Thread_Notify_Cb func_notify;
323          Ecore_Pipe *notify;
324
325          Ecore_Pipe *direct_pipe;
326          Ecore_Pthread_Worker *direct_worker;
327
328          int send;
329          int received;
330       } feedback_run;
331    } u;
332
333    Ecore_Thread_Cb func_cancel;
334    Ecore_Thread_Cb func_end;
335 #ifdef EFL_HAVE_THREADS
336    PH(self);
337    Eina_Hash *hash;
338    CD(cond);
339    LK(mutex);
340 #endif
341
342    const void *data;
343
344    Eina_Bool cancel : 1;
345    Eina_Bool feedback_run : 1;
346    Eina_Bool kill : 1;
347    Eina_Bool reschedule : 1;
348    Eina_Bool no_queue : 1;
349 };
350
351 #ifdef EFL_HAVE_THREADS
352 typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
353
354 struct _Ecore_Pthread_Data
355 {
356    Ecore_Pthread_Worker *death_job;
357    Ecore_Pipe *p;
358    void *data;
359    PH(thread);
360 };
361 #endif
362
363 static void _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte);
364
365 static int _ecore_thread_count_max = 0;
366 static int ECORE_THREAD_PIPE_DEL = 0;
367 static Eina_Array *_ecore_thread_pipe = NULL;
368
369 static Ecore_Pipe*
370 _ecore_thread_pipe_get(void)
371 {
372    if (eina_array_count_get(_ecore_thread_pipe) > 0)
373      return eina_array_pop(_ecore_thread_pipe);
374
375    return ecore_pipe_add(_ecore_thread_handler, NULL);
376 }
377
378 #ifdef EFL_HAVE_THREADS
379 static int _ecore_thread_count = 0;
380
381 static Ecore_Event_Handler *del_handler = NULL;
382 static Eina_List *_ecore_active_job_threads = NULL;
383 static Eina_List *_ecore_pending_job_threads = NULL;
384 static Eina_List *_ecore_pending_job_threads_feedback = NULL;
385 static LK(_ecore_pending_job_threads_mutex);
386
387 static Eina_Hash *_ecore_thread_global_hash = NULL;
388 static LRWK(_ecore_thread_global_hash_lock);
389 static LK(_ecore_thread_global_hash_mutex);
390 static CD(_ecore_thread_global_hash_cond);
391
392 static Eina_Bool have_main_loop_thread = 0;
393
394 static Eina_Trash *_ecore_thread_worker_trash = NULL;
395 static int _ecore_thread_worker_count = 0;
396
397 static void *_ecore_thread_worker(Ecore_Pthread_Data *pth);
398 static Ecore_Pthread_Worker *_ecore_thread_worker_new(void);
399
400 static PH(get_main_loop_thread)(void)
401 {
402   static PH(main_loop_thread);
403   static pid_t main_loop_pid;
404   pid_t pid = getpid();
405
406   if (pid != main_loop_pid)
407     {
408        main_loop_pid = pid;
409        main_loop_thread = PHS();
410        have_main_loop_thread = 1;
411     }
412
413   return main_loop_thread;
414 }
415
416 static void
417 _ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
418 {
419    if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16)
420      {
421         free(worker);
422         return ;
423      }
424
425    eina_trash_push(&_ecore_thread_worker_trash, worker);
426 }
427
428 static void
429 _ecore_thread_data_free(void *data)
430 {
431    Ecore_Thread_Data *d = data;
432
433    if (d->cb) d->cb(d->data);
434    free(d);
435 }
436
437 static void
438 _ecore_thread_pipe_free(void *data __UNUSED__, void *event)
439 {
440    Ecore_Pipe *p = event;
441
442    if (eina_array_count_get(_ecore_thread_pipe) < 50)
443      eina_array_push(_ecore_thread_pipe, p);
444    else
445      ecore_pipe_del(p);
446    eina_threads_shutdown();
447 }
448
449 static Eina_Bool
450 _ecore_thread_pipe_del(void *data __UNUSED__, int type __UNUSED__, void *event __UNUSED__)
451 {
452    /* This is a hack to delay pipe destruction until we are out of its internal loop. */
453    return ECORE_CALLBACK_CANCEL;
454 }
455
456 static void
457 _ecore_thread_end(Ecore_Pthread_Data *pth, Ecore_Thread *work)
458 {
459    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) work;
460    Ecore_Pipe *p;
461
462    if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
463      _ecore_thread_count--;
464
465    if (PHJ(pth->thread, p) != 0)
466      return ;
467
468    if (eina_list_count(_ecore_pending_job_threads) > 0
469        && (unsigned int) _ecore_thread_count < eina_list_count(_ecore_pending_job_threads)
470        && _ecore_thread_count < _ecore_thread_count_max)
471      {
472         /* One more thread should be created. */
473         INF("spawning threads because of still pending jobs.");
474
475         pth->death_job = _ecore_thread_worker_new();
476         if (!pth->p || !pth->death_job) goto end;
477
478         eina_threads_init();
479
480         if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
481           {
482              _ecore_thread_count++;
483              return ;
484           }
485
486         eina_threads_shutdown();
487
488      end:
489         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
490      }
491
492    _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
493
494    ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
495    free(pth);
496 }
497
498 static void
499 _ecore_thread_kill(Ecore_Pthread_Worker *work)
500 {
501    if (work->cancel)
502      {
503         if (work->func_cancel)
504           work->func_cancel((void *) work->data, (Ecore_Thread *) work);
505      }
506    else
507      {
508         if (work->func_end)
509           work->func_end((void *) work->data, (Ecore_Thread *) work);
510      }
511
512    if (work->feedback_run)
513      {
514         ecore_pipe_del(work->u.feedback_run.notify);
515
516         if (work->u.feedback_run.direct_pipe)
517           eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
518         if (work->u.feedback_run.direct_worker)
519           _ecore_thread_worker_free(work->u.feedback_run.direct_worker);
520      }
521    CDD(work->cond);
522    LKD(work->mutex);
523    if (work->hash)
524      eina_hash_free(work->hash);
525    free(work);
526 }
527
528 static void
529 _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
530 {
531    Ecore_Pthread_Worker *work;
532
533    if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
534
535    work = *(Ecore_Pthread_Worker **)buffer;
536
537    if (work->feedback_run)
538      {
539         if (work->u.feedback_run.send != work->u.feedback_run.received)
540           {
541              work->kill = EINA_TRUE;
542              return ;
543           }
544      }
545
546    _ecore_thread_kill(work);
547 }
548
549 static void
550 _ecore_notify_handler(void *data, void *buffer, unsigned int nbyte)
551 {
552    Ecore_Pthread_Worker *work = data;
553    void *user_data;
554
555    if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
556
557    user_data = *(void **)buffer;
558    work->u.feedback_run.received++;
559
560    if (work->u.feedback_run.func_notify)
561      work->u.feedback_run.func_notify((void *) work->data, (Ecore_Thread *) work, user_data);
562
563    /* Force reading all notify event before killing the thread */
564    if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
565      {
566         _ecore_thread_kill(work);
567      }
568 }
569
570 static void
571 _ecore_short_job(Ecore_Pipe *end_pipe)
572 {
573    Ecore_Pthread_Worker *work;
574
575    while (_ecore_pending_job_threads)
576      {
577         LKL(_ecore_pending_job_threads_mutex);
578
579         if (!_ecore_pending_job_threads)
580           {
581              LKU(_ecore_pending_job_threads_mutex);
582              break;
583           }
584
585         work = eina_list_data_get(_ecore_pending_job_threads);
586         _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
587                                                            _ecore_pending_job_threads);
588
589         LKU(_ecore_pending_job_threads_mutex);
590
591         if (!work->cancel)
592           work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
593
594         if (work->reschedule)
595           {
596              work->reschedule = EINA_FALSE;
597
598              LKL(_ecore_pending_job_threads_mutex);
599              _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
600              LKU(_ecore_pending_job_threads_mutex);
601           }
602         else
603           {
604              ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
605           }
606      }
607 }
608
609 static void
610 _ecore_feedback_job(Ecore_Pipe *end_pipe, PH(thread))
611 {
612    Ecore_Pthread_Worker *work;
613
614    while (_ecore_pending_job_threads_feedback)
615      {
616         LKL(_ecore_pending_job_threads_mutex);
617
618         if (!_ecore_pending_job_threads_feedback)
619           {
620              LKU(_ecore_pending_job_threads_mutex);
621              break;
622           }
623
624         work = eina_list_data_get(_ecore_pending_job_threads_feedback);
625         _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
626                                                                     _ecore_pending_job_threads_feedback);
627
628         LKU(_ecore_pending_job_threads_mutex);
629
630         work->self = thread;
631         if (!work->cancel)
632           work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
633
634         if (work->reschedule)
635           {
636              work->reschedule = EINA_FALSE;
637
638              LKL(_ecore_pending_job_threads_mutex);
639              _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
640              LKU(_ecore_pending_job_threads_mutex);
641           }
642         else
643           {
644              ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
645           }
646      }
647 }
648
649 static void *
650 _ecore_direct_worker(Ecore_Pthread_Worker *work)
651 {
652    Ecore_Pthread_Data *pth;
653
654 #ifdef EFL_POSIX_THREADS
655    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
656    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
657 #endif
658
659    eina_sched_prio_drop();
660
661    pth = malloc(sizeof (Ecore_Pthread_Data));
662    if (!pth) return NULL;
663
664    pth->p = work->u.feedback_run.direct_pipe;
665    if (!pth->p)
666      {
667         free(pth);
668         return NULL;
669      }
670    pth->thread = PHS();
671
672    work->self = pth->thread;
673    work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
674
675    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
676
677    work = work->u.feedback_run.direct_worker;
678    if (!work)
679      {
680         free(pth);
681         return NULL;
682      }
683
684    work->data = pth;
685    work->u.short_run.func_blocking = NULL;
686    work->func_end = (void *) _ecore_thread_end;
687    work->func_cancel = NULL;
688    work->cancel = EINA_FALSE;
689    work->feedback_run = EINA_FALSE;
690    work->kill = EINA_FALSE;
691    work->hash = NULL;
692    CDI(work->cond);
693    LKI(work->mutex);
694
695    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
696
697    return pth->p;
698 }
699
700 static void *
701 _ecore_thread_worker(Ecore_Pthread_Data *pth)
702 {
703    Ecore_Pthread_Worker *work;
704
705 #ifdef EFL_POSIX_THREADS
706    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
707    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
708 #endif
709
710    eina_sched_prio_drop();
711
712  restart:
713    if (_ecore_pending_job_threads) _ecore_short_job(pth->p);
714    if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
715
716    /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
717
718    LKL(_ecore_pending_job_threads_mutex);
719    if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
720      {
721         LKU(_ecore_pending_job_threads_mutex);
722         goto restart;
723      }
724    LKU(_ecore_pending_job_threads_mutex);
725
726    /* Sleep a little to prevent premature death */
727 #ifdef _WIN32
728    Sleep(1); /* around 50ms */
729 #else
730    usleep(200);
731 #endif
732
733    LKL(_ecore_pending_job_threads_mutex);
734    if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
735      {
736         LKU(_ecore_pending_job_threads_mutex);
737         goto restart;
738      }
739    LKU(_ecore_pending_job_threads_mutex);
740
741    work = pth->death_job;
742    if (!work) return NULL;
743
744    work->data = pth;
745    work->u.short_run.func_blocking = NULL;
746    work->func_end = (void *) _ecore_thread_end;
747    work->func_cancel = NULL;
748    work->cancel = EINA_FALSE;
749    work->feedback_run = EINA_FALSE;
750    work->kill = EINA_FALSE;
751    work->hash = NULL;
752    CDI(work->cond);
753    LKI(work->mutex);
754
755    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
756
757    return pth->p;
758 }
759
760 #endif
761
762 static Ecore_Pthread_Worker *
763 _ecore_thread_worker_new(void)
764 {
765    Ecore_Pthread_Worker *result;
766
767 #ifdef EFL_HAVE_THREADS
768    result = eina_trash_pop(&_ecore_thread_worker_trash);
769
770    if (!result) result = malloc(sizeof (Ecore_Pthread_Worker));
771    else _ecore_thread_worker_count--;
772
773    return result;
774 #else
775    return malloc(sizeof (Ecore_Pthread_Worker));
776 #endif
777 }
778
779 void
780 _ecore_thread_init(void)
781 {
782    _ecore_thread_count_max = eina_cpu_count();
783    if (_ecore_thread_count_max <= 0)
784      _ecore_thread_count_max = 1;
785
786    ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
787    _ecore_thread_pipe = eina_array_new(8);
788
789 #ifdef EFL_HAVE_THREADS
790    del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
791
792    LKI(_ecore_pending_job_threads_mutex);
793    LRWKI(_ecore_thread_global_hash_lock);
794    LKI(_ecore_thread_global_hash_mutex);
795    CDI(_ecore_thread_global_hash_cond);
796 #endif
797 }
798
799 void
800 _ecore_thread_shutdown(void)
801 {
802    /* FIXME: If function are still running in the background, should we kill them ? */
803    Ecore_Pipe *p;
804    Eina_Array_Iterator it;
805    unsigned int i;
806
807 #ifdef EFL_HAVE_THREADS
808    Ecore_Pthread_Worker *work;
809    Ecore_Pthread_Data *pth;
810
811    LKL(_ecore_pending_job_threads_mutex);
812
813    EINA_LIST_FREE(_ecore_pending_job_threads, work)
814      {
815         if (work->func_cancel)
816           work->func_cancel((void *)work->data, (Ecore_Thread *) work);
817         free(work);
818      }
819
820    EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
821      {
822         if (work->func_cancel)
823           work->func_cancel((void *)work->data, (Ecore_Thread *) work);
824         free(work);
825      }
826
827    LKU(_ecore_pending_job_threads_mutex);
828
829    /* Improve emergency shutdown */
830    EINA_LIST_FREE(_ecore_active_job_threads, pth)
831      {
832         Ecore_Pipe *ep;
833
834         PHA(pth->thread);
835         PHJ(pth->thread, ep);
836
837         ecore_pipe_del(pth->p);
838      }
839    if (_ecore_thread_global_hash)
840      eina_hash_free(_ecore_thread_global_hash);
841    ecore_event_handler_del(del_handler);
842    have_main_loop_thread = 0;
843    del_handler = NULL;
844
845    LKD(_ecore_pending_job_threads_mutex);
846    LRWKD(_ecore_thread_global_hash_lock);
847    LKD(_ecore_thread_global_hash_mutex);
848    CDD(_ecore_thread_global_hash_cond);
849 #endif
850
851    EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
852      ecore_pipe_del(p);
853
854    eina_array_free(_ecore_thread_pipe);
855    _ecore_thread_pipe = NULL;
856 }
857
858 void
859 _ecore_thread_assert_main_loop_thread(const char *function)
860 {
861    Eina_Bool good;
862 #ifdef EFL_HAVE_THREADS
863    good = PHE(get_main_loop_thread(), PHS());
864 #else
865    good = EINA_TRUE;
866 #endif
867    if (!good)
868      {
869         EINA_LOG_CRIT("Call to %s from wrong thread!", function);
870         abort();
871      }
872 }
873
874 EAPI Ecore_Thread *
875 ecore_thread_run(Ecore_Thread_Cb func_blocking,
876                  Ecore_Thread_Cb func_end,
877                  Ecore_Thread_Cb func_cancel,
878                  const void *data)
879 {
880    Ecore_Pthread_Worker *work;
881 #ifdef EFL_HAVE_THREADS
882    Ecore_Pthread_Data *pth = NULL;
883 #endif
884
885    if (!func_blocking) return NULL;
886
887    work = _ecore_thread_worker_new();
888    if (!work)
889      {
890         if (func_cancel)
891           func_cancel((void *) data, NULL);
892         return NULL;
893      }
894
895    work->u.short_run.func_blocking = func_blocking;
896    work->func_end = func_end;
897    work->func_cancel = func_cancel;
898    work->cancel = EINA_FALSE;
899    work->feedback_run = EINA_FALSE;
900    work->kill = EINA_FALSE;
901    work->reschedule = EINA_FALSE;
902    work->data = data;
903
904 #ifdef EFL_HAVE_THREADS
905    work->hash = NULL;
906    CDI(work->cond);
907    LKI(work->mutex);
908
909    LKL(_ecore_pending_job_threads_mutex);
910    _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
911
912    if (_ecore_thread_count == _ecore_thread_count_max)
913      {
914         LKU(_ecore_pending_job_threads_mutex);
915         return (Ecore_Thread *) work;
916      }
917
918    LKU(_ecore_pending_job_threads_mutex);
919
920    /* One more thread could be created. */
921    pth = malloc(sizeof (Ecore_Pthread_Data));
922    if (!pth) goto on_error;
923
924    pth->p = _ecore_thread_pipe_get();
925    pth->death_job = _ecore_thread_worker_new();
926    if (!pth->p || !pth->death_job) goto on_error;
927
928    eina_threads_init();
929
930    if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
931      {
932         _ecore_thread_count++;
933         return (Ecore_Thread *) work;
934      }
935
936    eina_threads_shutdown();
937
938  on_error:
939    if (pth)
940      {
941         if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
942         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
943         free(pth);
944      }
945
946    if (_ecore_thread_count == 0)
947      {
948         LKL(_ecore_pending_job_threads_mutex);
949         _ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
950         LKU(_ecore_pending_job_threads_mutex);
951
952         if (work->func_cancel)
953           work->func_cancel((void *) work->data, (Ecore_Thread *) work);
954         free(work);
955         work = NULL;
956      }
957    return (Ecore_Thread *) work;
958 #else
959    /*
960      If no thread and as we don't want to break app that rely on this
961      facility, we will lock the interface until we are done.
962     */
963    do {
964       /* Handle reschedule by forcing it here. That would mean locking the app,
965        * would be better with an idler, but really to complex for a case where
966        * thread should really exist.
967        */
968       work->reschedule = EINA_FALSE;
969
970       func_blocking((void *)data, (Ecore_Thread *) work);
971       if (work->cancel == EINA_FALSE) func_end((void *)data, (Ecore_Thread *) work);
972       else func_end((void *)data, (Ecore_Thread *) work);
973
974    } while (work->reschedule == EINA_TRUE);
975
976    free(work);
977
978    return NULL;
979 #endif
980 }
981
982 EAPI Eina_Bool
983 ecore_thread_cancel(Ecore_Thread *thread)
984 {
985 #ifdef EFL_HAVE_THREADS
986    Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
987    Eina_List *l;
988
989    if (!work)
990      return EINA_TRUE;
991    if (work->cancel)
992      return EINA_FALSE;
993
994    if (work->feedback_run)
995      {
996         if (work->kill)
997           return EINA_TRUE;
998         if (work->u.feedback_run.send != work->u.feedback_run.received)
999           goto on_exit;
1000      }
1001
1002    LKL(_ecore_pending_job_threads_mutex);
1003
1004    if ((have_main_loop_thread) &&
1005        (PHE(get_main_loop_thread(), PHS())))
1006      {
1007         if (!work->feedback_run)
1008           EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
1009             {
1010                if ((void *) work == (void *) thread)
1011                  {
1012                     _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
1013
1014                     LKU(_ecore_pending_job_threads_mutex);
1015
1016                     if (work->func_cancel)
1017                       work->func_cancel((void *) work->data, (Ecore_Thread *) work);
1018                     free(work);
1019
1020                     return EINA_TRUE;
1021                  }
1022             }
1023         else
1024           EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
1025             {
1026                if ((void *) work == (void *) thread)
1027                  {
1028                     _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
1029
1030                     LKU(_ecore_pending_job_threads_mutex);
1031
1032                     if (work->func_cancel)
1033                       work->func_cancel((void *) work->data, (Ecore_Thread *) work);
1034                     free(work);
1035
1036                     return EINA_TRUE;
1037                  }
1038             }
1039      }
1040
1041    LKU(_ecore_pending_job_threads_mutex);
1042
1043    /* Delay the destruction */
1044  on_exit:
1045    ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
1046    return EINA_FALSE;
1047 #else
1048    return EINA_TRUE;
1049 #endif
1050 }
1051
1052 EAPI Eina_Bool
1053 ecore_thread_check(Ecore_Thread *thread)
1054 {
1055    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1056
1057    if (!worker) return EINA_TRUE;
1058    return worker->cancel;
1059 }
1060
1061 EAPI Ecore_Thread *ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1062                                              Ecore_Thread_Notify_Cb func_notify,
1063                                              Ecore_Thread_Cb func_end,
1064                                              Ecore_Thread_Cb func_cancel,
1065                                              const void *data,
1066                                              Eina_Bool try_no_queue)
1067 {
1068
1069 #ifdef EFL_HAVE_THREADS
1070    Ecore_Pthread_Worker *worker;
1071    Ecore_Pthread_Data *pth = NULL;
1072
1073    if (!func_heavy) return NULL;
1074
1075    worker = _ecore_thread_worker_new();
1076    if (!worker) goto on_error;
1077
1078    worker->u.feedback_run.func_heavy = func_heavy;
1079    worker->u.feedback_run.func_notify = func_notify;
1080    worker->hash = NULL;
1081    CDI(worker->cond);
1082    LKI(worker->mutex);
1083    worker->func_cancel = func_cancel;
1084    worker->func_end = func_end;
1085    worker->data = data;
1086    worker->cancel = EINA_FALSE;
1087    worker->feedback_run = EINA_TRUE;
1088    worker->kill = EINA_FALSE;
1089    worker->reschedule = EINA_FALSE;
1090
1091    worker->u.feedback_run.send = 0;
1092    worker->u.feedback_run.received = 0;
1093
1094    worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
1095    worker->u.feedback_run.direct_pipe = NULL;
1096    worker->u.feedback_run.direct_worker = NULL;
1097
1098    if (!try_no_queue)
1099      {
1100         PH(t);
1101
1102         worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
1103         worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
1104         worker->no_queue = EINA_TRUE;
1105
1106         eina_threads_init();
1107
1108         if (PHC(t, _ecore_direct_worker, worker) == 0)
1109            return (Ecore_Thread *) worker;
1110
1111         eina_threads_shutdown();
1112      }
1113
1114    worker->no_queue = EINA_FALSE;
1115
1116    LKL(_ecore_pending_job_threads_mutex);
1117    _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, worker);
1118
1119    if (_ecore_thread_count == _ecore_thread_count_max)
1120      {
1121         LKU(_ecore_pending_job_threads_mutex);
1122         return (Ecore_Thread *) worker;
1123      }
1124
1125    LKU(_ecore_pending_job_threads_mutex);
1126
1127    /* One more thread could be created. */
1128    pth = malloc(sizeof (Ecore_Pthread_Data));
1129    if (!pth) goto on_error;
1130
1131    pth->p = _ecore_thread_pipe_get();
1132    pth->death_job = _ecore_thread_worker_new();
1133    if (!pth->p || !pth->death_job) goto on_error;
1134
1135    eina_threads_init();
1136
1137    if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
1138      {
1139         _ecore_thread_count++;
1140         return (Ecore_Thread *) worker;
1141      }
1142
1143    eina_threads_shutdown();
1144
1145  on_error:
1146    if (pth)
1147      {
1148         if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
1149         if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
1150         free(pth);
1151      }
1152
1153    if (_ecore_thread_count == 0)
1154      {
1155         LKL(_ecore_pending_job_threads_mutex);
1156         _ecore_pending_job_threads_feedback = eina_list_remove(_ecore_pending_job_threads_feedback,
1157                                                                worker);
1158         LKU(_ecore_pending_job_threads_mutex);
1159
1160         if (func_cancel) func_cancel((void *) data, NULL);
1161
1162         if (worker)
1163           {
1164              ecore_pipe_del(worker->u.feedback_run.notify);
1165              free(worker);
1166              worker = NULL;
1167           }
1168      }
1169
1170    return (Ecore_Thread *) worker;
1171 #else
1172    Ecore_Pthread_Worker worker;
1173
1174    (void) try_no_queue;
1175
1176    /*
1177      If no thread and as we don't want to break app that rely on this
1178      facility, we will lock the interface until we are done.
1179     */
1180    worker.u.feedback_run.func_heavy = func_heavy;
1181    worker.u.feedback_run.func_notify = func_notify;
1182    worker.u.feedback_run.notify = NULL;
1183    worker.u.feedback_run.send = 0;
1184    worker.u.feedback_run.received = 0;
1185    worker.func_cancel = func_cancel;
1186    worker.func_end = func_end;
1187    worker.data = data;
1188    worker.cancel = EINA_FALSE;
1189    worker.feedback_run = EINA_TRUE;
1190    worker.kill = EINA_FALSE;
1191
1192    do {
1193       worker.reschedule = EINA_FALSE;
1194
1195       func_heavy((void *)data, (Ecore_Thread *) &worker);
1196
1197       if (worker.cancel) func_cancel((void *)data, (Ecore_Thread *) &worker);
1198       else func_end((void *)data, (Ecore_Thread *) &worker);
1199    } while (worker.reschedule == EINA_FALSE);
1200
1201    return NULL;
1202 #endif
1203 }
1204
1205 EAPI Eina_Bool
1206 ecore_thread_feedback(Ecore_Thread *thread, const void *data)
1207 {
1208    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1209
1210    if (!worker) return EINA_FALSE;
1211    if (!worker->feedback_run) return EINA_FALSE;
1212
1213 #ifdef EFL_HAVE_THREADS
1214    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1215
1216    worker->u.feedback_run.send++;
1217    ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
1218
1219    return EINA_TRUE;
1220 #else
1221    worker->u.feedback_run.func_notify((void*) worker->data, thread, (void*) data);
1222
1223    return EINA_TRUE;
1224 #endif
1225 }
1226
1227 EAPI Eina_Bool
1228 ecore_thread_reschedule(Ecore_Thread *thread)
1229 {
1230    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1231
1232    if (!worker) return EINA_FALSE;
1233
1234 #ifdef EFL_HAVE_THREADS
1235    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1236 #endif
1237
1238    worker->reschedule = EINA_TRUE;
1239    return EINA_TRUE;
1240 }
1241
1242 EAPI int
1243 ecore_thread_active_get(void)
1244 {
1245 #ifdef EFL_HAVE_THREADS
1246    return _ecore_thread_count;
1247 #else
1248    return 0;
1249 #endif
1250 }
1251
1252 EAPI int
1253 ecore_thread_pending_get(void)
1254 {
1255    int ret;
1256 #ifdef EFL_HAVE_THREADS
1257    LKL(_ecore_pending_job_threads_mutex);
1258    ret = eina_list_count(_ecore_pending_job_threads);
1259    LKU(_ecore_pending_job_threads_mutex);
1260    return ret;
1261 #else
1262    return 0;
1263 #endif
1264 }
1265
1266 EAPI int
1267 ecore_thread_pending_feedback_get(void)
1268 {
1269    int ret;
1270 #ifdef EFL_HAVE_THREADS
1271    LKL(_ecore_pending_job_threads_mutex);
1272    ret = eina_list_count(_ecore_pending_job_threads_feedback);
1273    LKU(_ecore_pending_job_threads_mutex);
1274    return ret;
1275 #else
1276    return 0;
1277 #endif
1278 }
1279
1280 EAPI int
1281 ecore_thread_pending_total_get(void)
1282 {
1283    int ret;
1284 #ifdef EFL_HAVE_THREADS
1285    LKL(_ecore_pending_job_threads_mutex);
1286    ret = eina_list_count(_ecore_pending_job_threads) + eina_list_count(_ecore_pending_job_threads_feedback);
1287    LKU(_ecore_pending_job_threads_mutex);
1288    return ret;
1289 #else
1290    return 0;
1291 #endif
1292 }
1293
1294 EAPI int
1295 ecore_thread_max_get(void)
1296 {
1297    return _ecore_thread_count_max;
1298 }
1299
1300 EAPI void
1301 ecore_thread_max_set(int num)
1302 {
1303    if (num < 1) return;
1304    /* avoid doing something hilarious by blocking dumb users */
1305    if (num >= (2 * eina_cpu_count())) return;
1306
1307    _ecore_thread_count_max = num;
1308 }
1309
1310 EAPI void
1311 ecore_thread_max_reset(void)
1312 {
1313    _ecore_thread_count_max = eina_cpu_count();
1314 }
1315
1316 EAPI int
1317 ecore_thread_available_get(void)
1318 {
1319    int ret;
1320 #ifdef EFL_HAVE_THREADS
1321    LKL(_ecore_pending_job_threads_mutex);
1322    ret = _ecore_thread_count_max - _ecore_thread_count;
1323    LKU(_ecore_pending_job_threads_mutex);
1324    return ret;
1325 #else
1326    return 0;
1327 #endif
1328 }
1329
1330 EAPI Eina_Bool
1331 ecore_thread_local_data_add(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1332 {
1333    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1334    Ecore_Thread_Data *d;
1335    Eina_Bool ret;
1336
1337    if ((!thread) || (!key) || (!value))
1338      return EINA_FALSE;
1339 #ifdef EFL_HAVE_THREADS
1340    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1341
1342    if (!worker->hash)
1343      worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1344
1345    if (!worker->hash)
1346      return EINA_FALSE;
1347
1348    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1349      return EINA_FALSE;
1350
1351    d->data = value;
1352    d->cb = cb;
1353
1354    if (direct)
1355      ret = eina_hash_direct_add(worker->hash, key, d);
1356    else
1357      ret = eina_hash_add(worker->hash, key, d);
1358    CDB(worker->cond);
1359    return ret;
1360 #else
1361    return EINA_TRUE;
1362 #endif
1363 }
1364
1365 EAPI void *
1366 ecore_thread_local_data_set(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb)
1367 {
1368    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1369    Ecore_Thread_Data *d, *r;
1370    void *ret;
1371    if ((!thread) || (!key) || (!value))
1372      return NULL;
1373 #ifdef EFL_HAVE_THREADS
1374    if (!PHE(worker->self, PHS())) return NULL;
1375
1376    if (!worker->hash)
1377      worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1378
1379    if (!worker->hash)
1380      return NULL;
1381
1382    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1383      return NULL;
1384
1385    d->data = value;
1386    d->cb = cb;
1387
1388    r = eina_hash_set(worker->hash, key, d);
1389    CDB(worker->cond);
1390    ret = r->data;
1391    free(r);
1392    return ret;
1393 #else
1394    return NULL;
1395 #endif
1396 }
1397
1398
1399 EAPI void *
1400 ecore_thread_local_data_find(Ecore_Thread *thread, const char *key)
1401 {
1402    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1403    Ecore_Thread_Data *d;
1404
1405    if ((!thread) || (!key))
1406      return NULL;
1407 #ifdef EFL_HAVE_THREADS
1408    if (!PHE(worker->self, PHS())) return NULL;
1409
1410    if (!worker->hash)
1411      return NULL;
1412
1413    d = eina_hash_find(worker->hash, key);
1414    return d->data;
1415 #else
1416    return NULL;
1417 #endif
1418 }
1419
1420 EAPI Eina_Bool
1421 ecore_thread_local_data_del(Ecore_Thread *thread, const char *key)
1422 {
1423    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1424    Ecore_Thread_Data *d;
1425    if ((!thread) || (!key))
1426      return EINA_FALSE;
1427 #ifdef EFL_HAVE_THREADS
1428    if (!PHE(worker->self, PHS())) return EINA_FALSE;
1429
1430    if (!worker->hash)
1431      return EINA_FALSE;
1432    if ((d = eina_hash_find(worker->hash, key)))
1433      _ecore_thread_data_free(d);
1434    return eina_hash_del_by_key(worker->hash, key);
1435 #else
1436    return EINA_TRUE;
1437 #endif
1438 }
1439
1440 EAPI Eina_Bool
1441 ecore_thread_global_data_add(const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1442 {
1443    Eina_Bool ret;
1444    Ecore_Thread_Data *d;
1445
1446    if ((!key) || (!value))
1447      return EINA_FALSE;
1448 #ifdef EFL_HAVE_THREADS
1449    LRWKWL(_ecore_thread_global_hash_lock);
1450    if (!_ecore_thread_global_hash)
1451      _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1452    LRWKU(_ecore_thread_global_hash_lock);
1453
1454    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1455      return EINA_FALSE;
1456
1457    d->data = value;
1458    d->cb = cb;
1459
1460    if (!_ecore_thread_global_hash)
1461      return EINA_FALSE;
1462    LRWKWL(_ecore_thread_global_hash_lock);
1463    if (direct)
1464      ret = eina_hash_direct_add(_ecore_thread_global_hash, key, d);
1465    else
1466      ret = eina_hash_add(_ecore_thread_global_hash, key, d);
1467    LRWKU(_ecore_thread_global_hash_lock);
1468    CDB(_ecore_thread_global_hash_cond);
1469    return ret;
1470 #else
1471    return EINA_TRUE;
1472 #endif
1473 }
1474
1475 EAPI void *
1476 ecore_thread_global_data_set(const char *key, void *value, Eina_Free_Cb cb)
1477 {
1478    Ecore_Thread_Data *d, *r;
1479    void *ret;
1480
1481    if ((!key) || (!value))
1482      return NULL;
1483 #ifdef EFL_HAVE_THREADS
1484    LRWKWL(_ecore_thread_global_hash_lock);
1485    if (!_ecore_thread_global_hash)
1486      _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1487    LRWKU(_ecore_thread_global_hash_lock);
1488
1489    if (!_ecore_thread_global_hash)
1490      return NULL;
1491
1492    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1493      return NULL;
1494
1495    d->data = value;
1496    d->cb = cb;
1497
1498    LRWKWL(_ecore_thread_global_hash_lock);
1499    r = eina_hash_set(_ecore_thread_global_hash, key, d);
1500    LRWKU(_ecore_thread_global_hash_lock);
1501    CDB(_ecore_thread_global_hash_cond);
1502
1503    ret = r->data;
1504    free(r);
1505    return ret;
1506 #else
1507    return NULL;
1508 #endif
1509 }
1510
1511
1512 EAPI void *
1513 ecore_thread_global_data_find(const char *key)
1514 {
1515    Ecore_Thread_Data *ret;
1516    if (!key)
1517      return NULL;
1518 #ifdef EFL_HAVE_THREADS
1519    if (!_ecore_thread_global_hash) return NULL;
1520
1521    LRWKRL(_ecore_thread_global_hash_lock);
1522    ret = eina_hash_find(_ecore_thread_global_hash, key);
1523    LRWKU(_ecore_thread_global_hash_lock);
1524    return ret->data;
1525 #else
1526    return NULL;
1527 #endif
1528 }
1529
1530 EAPI Eina_Bool
1531 ecore_thread_global_data_del(const char *key)
1532 {
1533    Eina_Bool ret;
1534    Ecore_Thread_Data *d;
1535
1536    if (!key)
1537      return EINA_FALSE;
1538 #ifdef EFL_HAVE_THREADS
1539    if (!_ecore_thread_global_hash)
1540      return EINA_FALSE;
1541
1542    LRWKWL(_ecore_thread_global_hash_lock);
1543    if ((d = eina_hash_find(_ecore_thread_global_hash, key)))
1544      _ecore_thread_data_free(d);
1545    ret = eina_hash_del_by_key(_ecore_thread_global_hash, key);
1546    LRWKU(_ecore_thread_global_hash_lock);
1547    return ret;
1548 #else
1549    return EINA_TRUE;
1550 #endif
1551 }
1552
1553 EAPI void *
1554 ecore_thread_global_data_wait(const char *key, double seconds)
1555 {
1556    double tm = 0;
1557    Ecore_Thread_Data *ret = NULL;
1558
1559    if (!key)
1560      return NULL;
1561 #ifdef EFL_HAVE_THREADS
1562    if (!_ecore_thread_global_hash)
1563      return NULL;
1564    if (seconds > 0)
1565      tm = ecore_time_get() + seconds;
1566
1567    while (1)
1568      {
1569 #ifndef _WIN32
1570         struct timespec t = { 0, 0 };
1571
1572         t.tv_sec = (long int)tm;
1573         t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
1574 #else
1575         struct timeval t = { 0, 0 };
1576
1577         t.tv_sec = (long int)tm;
1578         t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
1579 #endif
1580         LRWKRL(_ecore_thread_global_hash_lock);
1581         ret = eina_hash_find(_ecore_thread_global_hash, key);
1582         LRWKU(_ecore_thread_global_hash_lock);
1583         if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
1584           break;
1585         LKL(_ecore_thread_global_hash_mutex);
1586         CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
1587         LKU(_ecore_thread_global_hash_mutex);
1588      }
1589    if (ret) return ret->data;
1590    return NULL;
1591 #else
1592    return NULL;
1593 #endif
1594 }