12 #include "ecore_private.h"
14 #ifdef EFL_HAVE_THREADS
16 # ifdef EFL_HAVE_POSIX_THREADS
20 # include <sys/resource.h>
22 # include <sys/syscall.h>
26 # define PH(x) pthread_t x
27 # define PHE(x, y) pthread_equal(x, y)
28 # define PHS() pthread_self()
29 # define PHC(x, f, d) pthread_create(&(x), NULL, (void*) f, d)
30 # define PHJ(x, p) pthread_join(x, (void**)(&(p)))
31 # define PHA(x) pthread_cancel(x)
33 # define CD(x) pthread_cond_t x
34 # define CDI(x) pthread_cond_init(&(x), NULL);
35 # define CDD(x) pthread_cond_destroy(&(x));
36 # define CDB(x) pthread_cond_broadcast(&(x));
37 # define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
39 # define LK(x) pthread_mutex_t x
40 # define LKI(x) pthread_mutex_init(&(x), NULL);
41 # define LKD(x) pthread_mutex_destroy(&(x));
42 # define LKL(x) pthread_mutex_lock(&(x));
43 # define LKU(x) pthread_mutex_unlock(&(x));
45 # define LRWK(x) pthread_rwlock_t x
46 # define LRWKI(x) pthread_rwlock_init(&(x), NULL);
47 # define LRWKD(x) pthread_rwlock_destroy(&(x));
48 # define LRWKWL(x) pthread_rwlock_wrlock(&(x));
49 # define LRWKRL(x) pthread_rwlock_rdlock(&(x));
50 # define LRWKU(x) pthread_rwlock_unlock(&(x));
52 # else /* EFL_HAVE_WIN32_THREADS */
54 # define WIN32_LEAN_AND_MEAN
56 # undef WIN32_LEAN_AND_MEAN
64 # define PH(x) win32_thread *x
65 # define PHE(x, y) ((x) == (y))
66 # define PHS() (HANDLE)GetCurrentThreadId()
68 int _ecore_thread_win32_create(win32_thread **x, LPTHREAD_START_ROUTINE f, void *d)
71 t = (win32_thread *)calloc(1, sizeof(win32_thread));
75 (t)->thread = CreateThread(NULL, 0, f, d, 0, NULL);
86 # define PHC(x, f, d) _ecore_thread_win32_create(&(x), (LPTHREAD_START_ROUTINE)f, d)
88 int _ecore_thread_win32_join(win32_thread *x, void **res)
92 WaitForSingleObject(x->thread, INFINITE);
93 CloseHandle(x->thread);
95 if (res) *res = x->val;
101 # define PHJ(x, p) _ecore_thread_win32_join(x, (void**)(&(p)))
102 # define PHA(x) TerminateThread(x->thread, 0)
104 # define LK(x) HANDLE x
105 # define LKI(x) x = CreateMutex(NULL, FALSE, NULL)
106 # define LKD(x) CloseHandle(x)
107 # define LKL(x) WaitForSingleObject(x, INFINITE)
108 # define LKU(x) ReleaseMutex(x)
114 CRITICAL_SECTION threads_count_lock;
117 # define CD(x) win32_cond *x
121 x = (win32_cond *)calloc(1, sizeof(win32_cond)); \
124 x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
126 InitializeCriticalSection(&x->threads_count_lock); \
137 CloseHandle(x->semaphore); \
144 EnterCriticalSection(&x->threads_count_lock); \
145 if (x->threads_count > 0) \
146 ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \
147 LeaveCriticalSection (&x->threads_count_lock); \
150 int _ecore_thread_win32_cond_timedwait(win32_cond *c, HANDLE *external_mutex, struct timeval *t)
153 DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
155 EnterCriticalSection (&c->threads_count_lock);
157 LeaveCriticalSection (&c->threads_count_lock);
159 res = WaitForSingleObject(c->semaphore, val);
160 if (res == WAIT_OBJECT_0)
165 # define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
178 # define LRWK(x) win32_rwl *x
181 x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
190 CDI(x->cond_write); \
191 if (!x->cond_write) \
218 CDD(x->cond_write); \
226 if (x->writers || x->readers > 0) \
228 x->writers_count++; \
229 while (x->writers || x->readers > 0) \
231 EnterCriticalSection(&x->cond_write->threads_count_lock); \
232 x->cond_read->threads_count++; \
233 LeaveCriticalSection(&x->cond_write->threads_count_lock); \
234 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
235 if (res != WAIT_OBJECT_0) break; \
237 x->writers_count--; \
239 if (res == 0) x->writers_count = 1; \
248 x->readers_count++; \
251 EnterCriticalSection(&x->cond_write->threads_count_lock); \
252 x->cond_read->threads_count++; \
253 LeaveCriticalSection(&x->cond_write->threads_count_lock); \
254 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
255 if (res != WAIT_OBJECT_0) break; \
257 x->readers_count--; \
269 if (x->readers_count == 1) \
271 EnterCriticalSection(&x->cond_read->threads_count_lock); \
272 if (x->cond_read->threads_count > 0) \
273 ReleaseSemaphore(x->cond_read->semaphore, 1, 0); \
274 LeaveCriticalSection(&x->cond_read->threads_count_lock); \
276 else if (x->readers_count > 0) \
278 else if (x->writers_count > 0) \
280 EnterCriticalSection (&x->cond_write->threads_count_lock); \
281 if (x->cond_write->threads_count > 0) \
282 ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
283 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
286 else if (x->readers > 0) \
289 if (x->readers == 0 && x->writers_count > 0) \
291 EnterCriticalSection (&x->cond_write->threads_count_lock); \
292 if (x->cond_write->threads_count > 0) \
293 ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
294 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
304 typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
305 typedef struct _Ecore_Pthread Ecore_Pthread;
306 typedef struct _Ecore_Thread_Data Ecore_Thread_Data;
308 struct _Ecore_Thread_Data
314 struct _Ecore_Pthread_Worker
318 Ecore_Thread_Cb func_blocking;
321 Ecore_Thread_Cb func_heavy;
322 Ecore_Thread_Notify_Cb func_notify;
325 Ecore_Pipe *direct_pipe;
326 Ecore_Pthread_Worker *direct_worker;
333 Ecore_Thread_Cb func_cancel;
334 Ecore_Thread_Cb func_end;
335 #ifdef EFL_HAVE_THREADS
344 Eina_Bool cancel : 1;
345 Eina_Bool feedback_run : 1;
347 Eina_Bool reschedule : 1;
348 Eina_Bool no_queue : 1;
351 #ifdef EFL_HAVE_THREADS
352 typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
354 struct _Ecore_Pthread_Data
356 Ecore_Pthread_Worker *death_job;
363 static void _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte);
365 static int _ecore_thread_count_max = 0;
366 static int ECORE_THREAD_PIPE_DEL = 0;
367 static Eina_Array *_ecore_thread_pipe = NULL;
370 _ecore_thread_pipe_get(void)
372 if (eina_array_count_get(_ecore_thread_pipe) > 0)
373 return eina_array_pop(_ecore_thread_pipe);
375 return ecore_pipe_add(_ecore_thread_handler, NULL);
378 #ifdef EFL_HAVE_THREADS
379 static int _ecore_thread_count = 0;
381 static Ecore_Event_Handler *del_handler = NULL;
382 static Eina_List *_ecore_active_job_threads = NULL;
383 static Eina_List *_ecore_pending_job_threads = NULL;
384 static Eina_List *_ecore_pending_job_threads_feedback = NULL;
385 static LK(_ecore_pending_job_threads_mutex);
387 static Eina_Hash *_ecore_thread_global_hash = NULL;
388 static LRWK(_ecore_thread_global_hash_lock);
389 static LK(_ecore_thread_global_hash_mutex);
390 static CD(_ecore_thread_global_hash_cond);
392 static Eina_Bool have_main_loop_thread = 0;
394 static Eina_Trash *_ecore_thread_worker_trash = NULL;
395 static int _ecore_thread_worker_count = 0;
397 static void *_ecore_thread_worker(Ecore_Pthread_Data *pth);
398 static Ecore_Pthread_Worker *_ecore_thread_worker_new(void);
400 static PH(get_main_loop_thread)(void)
402 static PH(main_loop_thread);
403 static pid_t main_loop_pid;
404 pid_t pid = getpid();
406 if (pid != main_loop_pid)
409 main_loop_thread = PHS();
410 have_main_loop_thread = 1;
413 return main_loop_thread;
417 _ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
419 if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16)
425 eina_trash_push(&_ecore_thread_worker_trash, worker);
429 _ecore_thread_data_free(void *data)
431 Ecore_Thread_Data *d = data;
433 if (d->cb) d->cb(d->data);
438 _ecore_thread_pipe_free(void *data __UNUSED__, void *event)
440 Ecore_Pipe *p = event;
442 if (eina_array_count_get(_ecore_thread_pipe) < 50)
443 eina_array_push(_ecore_thread_pipe, p);
446 eina_threads_shutdown();
450 _ecore_thread_pipe_del(void *data __UNUSED__, int type __UNUSED__, void *event __UNUSED__)
452 /* This is a hack to delay pipe destruction until we are out of its internal loop. */
453 return ECORE_CALLBACK_CANCEL;
457 _ecore_thread_end(Ecore_Pthread_Data *pth, Ecore_Thread *work)
459 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) work;
462 if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
463 _ecore_thread_count--;
465 if (PHJ(pth->thread, p) != 0)
468 if (eina_list_count(_ecore_pending_job_threads) > 0
469 && (unsigned int) _ecore_thread_count < eina_list_count(_ecore_pending_job_threads)
470 && _ecore_thread_count < _ecore_thread_count_max)
472 /* One more thread should be created. */
473 INF("spawning threads because of still pending jobs.");
475 pth->death_job = _ecore_thread_worker_new();
476 if (!pth->p || !pth->death_job) goto end;
480 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
482 _ecore_thread_count++;
486 eina_threads_shutdown();
489 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
492 _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
494 ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
499 _ecore_thread_kill(Ecore_Pthread_Worker *work)
503 if (work->func_cancel)
504 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
509 work->func_end((void *) work->data, (Ecore_Thread *) work);
512 if (work->feedback_run)
514 ecore_pipe_del(work->u.feedback_run.notify);
516 if (work->u.feedback_run.direct_pipe)
517 eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
518 if (work->u.feedback_run.direct_worker)
519 _ecore_thread_worker_free(work->u.feedback_run.direct_worker);
524 eina_hash_free(work->hash);
529 _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
531 Ecore_Pthread_Worker *work;
533 if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
535 work = *(Ecore_Pthread_Worker **)buffer;
537 if (work->feedback_run)
539 if (work->u.feedback_run.send != work->u.feedback_run.received)
541 work->kill = EINA_TRUE;
546 _ecore_thread_kill(work);
550 _ecore_notify_handler(void *data, void *buffer, unsigned int nbyte)
552 Ecore_Pthread_Worker *work = data;
555 if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
557 user_data = *(void **)buffer;
558 work->u.feedback_run.received++;
560 if (work->u.feedback_run.func_notify)
561 work->u.feedback_run.func_notify((void *) work->data, (Ecore_Thread *) work, user_data);
563 /* Force reading all notify event before killing the thread */
564 if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
566 _ecore_thread_kill(work);
571 _ecore_short_job(Ecore_Pipe *end_pipe)
573 Ecore_Pthread_Worker *work;
575 while (_ecore_pending_job_threads)
577 LKL(_ecore_pending_job_threads_mutex);
579 if (!_ecore_pending_job_threads)
581 LKU(_ecore_pending_job_threads_mutex);
585 work = eina_list_data_get(_ecore_pending_job_threads);
586 _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
587 _ecore_pending_job_threads);
589 LKU(_ecore_pending_job_threads_mutex);
592 work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
594 if (work->reschedule)
596 work->reschedule = EINA_FALSE;
598 LKL(_ecore_pending_job_threads_mutex);
599 _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
600 LKU(_ecore_pending_job_threads_mutex);
604 ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
610 _ecore_feedback_job(Ecore_Pipe *end_pipe, PH(thread))
612 Ecore_Pthread_Worker *work;
614 while (_ecore_pending_job_threads_feedback)
616 LKL(_ecore_pending_job_threads_mutex);
618 if (!_ecore_pending_job_threads_feedback)
620 LKU(_ecore_pending_job_threads_mutex);
624 work = eina_list_data_get(_ecore_pending_job_threads_feedback);
625 _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
626 _ecore_pending_job_threads_feedback);
628 LKU(_ecore_pending_job_threads_mutex);
632 work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
634 if (work->reschedule)
636 work->reschedule = EINA_FALSE;
638 LKL(_ecore_pending_job_threads_mutex);
639 _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
640 LKU(_ecore_pending_job_threads_mutex);
644 ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
650 _ecore_direct_worker(Ecore_Pthread_Worker *work)
652 Ecore_Pthread_Data *pth;
654 #ifdef EFL_POSIX_THREADS
655 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
656 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
659 eina_sched_prio_drop();
661 pth = malloc(sizeof (Ecore_Pthread_Data));
662 if (!pth) return NULL;
664 pth->p = work->u.feedback_run.direct_pipe;
672 work->self = pth->thread;
673 work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
675 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
677 work = work->u.feedback_run.direct_worker;
685 work->u.short_run.func_blocking = NULL;
686 work->func_end = (void *) _ecore_thread_end;
687 work->func_cancel = NULL;
688 work->cancel = EINA_FALSE;
689 work->feedback_run = EINA_FALSE;
690 work->kill = EINA_FALSE;
695 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
701 _ecore_thread_worker(Ecore_Pthread_Data *pth)
703 Ecore_Pthread_Worker *work;
705 #ifdef EFL_POSIX_THREADS
706 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
707 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
710 eina_sched_prio_drop();
713 if (_ecore_pending_job_threads) _ecore_short_job(pth->p);
714 if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
716 /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
718 LKL(_ecore_pending_job_threads_mutex);
719 if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
721 LKU(_ecore_pending_job_threads_mutex);
724 LKU(_ecore_pending_job_threads_mutex);
726 /* Sleep a little to prevent premature death */
728 Sleep(1); /* around 50ms */
733 LKL(_ecore_pending_job_threads_mutex);
734 if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
736 LKU(_ecore_pending_job_threads_mutex);
739 LKU(_ecore_pending_job_threads_mutex);
741 work = pth->death_job;
742 if (!work) return NULL;
745 work->u.short_run.func_blocking = NULL;
746 work->func_end = (void *) _ecore_thread_end;
747 work->func_cancel = NULL;
748 work->cancel = EINA_FALSE;
749 work->feedback_run = EINA_FALSE;
750 work->kill = EINA_FALSE;
755 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
762 static Ecore_Pthread_Worker *
763 _ecore_thread_worker_new(void)
765 Ecore_Pthread_Worker *result;
767 #ifdef EFL_HAVE_THREADS
768 result = eina_trash_pop(&_ecore_thread_worker_trash);
770 if (!result) result = malloc(sizeof (Ecore_Pthread_Worker));
771 else _ecore_thread_worker_count--;
775 return malloc(sizeof (Ecore_Pthread_Worker));
780 _ecore_thread_init(void)
782 _ecore_thread_count_max = eina_cpu_count();
783 if (_ecore_thread_count_max <= 0)
784 _ecore_thread_count_max = 1;
786 ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
787 _ecore_thread_pipe = eina_array_new(8);
789 #ifdef EFL_HAVE_THREADS
790 del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
792 LKI(_ecore_pending_job_threads_mutex);
793 LRWKI(_ecore_thread_global_hash_lock);
794 LKI(_ecore_thread_global_hash_mutex);
795 CDI(_ecore_thread_global_hash_cond);
800 _ecore_thread_shutdown(void)
802 /* FIXME: If function are still running in the background, should we kill them ? */
804 Eina_Array_Iterator it;
807 #ifdef EFL_HAVE_THREADS
808 Ecore_Pthread_Worker *work;
809 Ecore_Pthread_Data *pth;
811 LKL(_ecore_pending_job_threads_mutex);
813 EINA_LIST_FREE(_ecore_pending_job_threads, work)
815 if (work->func_cancel)
816 work->func_cancel((void *)work->data, (Ecore_Thread *) work);
820 EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
822 if (work->func_cancel)
823 work->func_cancel((void *)work->data, (Ecore_Thread *) work);
827 LKU(_ecore_pending_job_threads_mutex);
829 /* Improve emergency shutdown */
830 EINA_LIST_FREE(_ecore_active_job_threads, pth)
835 PHJ(pth->thread, ep);
837 ecore_pipe_del(pth->p);
839 if (_ecore_thread_global_hash)
840 eina_hash_free(_ecore_thread_global_hash);
841 ecore_event_handler_del(del_handler);
842 have_main_loop_thread = 0;
845 LKD(_ecore_pending_job_threads_mutex);
846 LRWKD(_ecore_thread_global_hash_lock);
847 LKD(_ecore_thread_global_hash_mutex);
848 CDD(_ecore_thread_global_hash_cond);
851 EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
854 eina_array_free(_ecore_thread_pipe);
855 _ecore_thread_pipe = NULL;
859 _ecore_thread_assert_main_loop_thread(const char *function)
862 #ifdef EFL_HAVE_THREADS
863 good = PHE(get_main_loop_thread(), PHS());
869 EINA_LOG_CRIT("Call to %s from wrong thread!", function);
875 ecore_thread_run(Ecore_Thread_Cb func_blocking,
876 Ecore_Thread_Cb func_end,
877 Ecore_Thread_Cb func_cancel,
880 Ecore_Pthread_Worker *work;
881 #ifdef EFL_HAVE_THREADS
882 Ecore_Pthread_Data *pth = NULL;
885 if (!func_blocking) return NULL;
887 work = _ecore_thread_worker_new();
891 func_cancel((void *) data, NULL);
895 work->u.short_run.func_blocking = func_blocking;
896 work->func_end = func_end;
897 work->func_cancel = func_cancel;
898 work->cancel = EINA_FALSE;
899 work->feedback_run = EINA_FALSE;
900 work->kill = EINA_FALSE;
901 work->reschedule = EINA_FALSE;
904 #ifdef EFL_HAVE_THREADS
909 LKL(_ecore_pending_job_threads_mutex);
910 _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
912 if (_ecore_thread_count == _ecore_thread_count_max)
914 LKU(_ecore_pending_job_threads_mutex);
915 return (Ecore_Thread *) work;
918 LKU(_ecore_pending_job_threads_mutex);
920 /* One more thread could be created. */
921 pth = malloc(sizeof (Ecore_Pthread_Data));
922 if (!pth) goto on_error;
924 pth->p = _ecore_thread_pipe_get();
925 pth->death_job = _ecore_thread_worker_new();
926 if (!pth->p || !pth->death_job) goto on_error;
930 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
932 _ecore_thread_count++;
933 return (Ecore_Thread *) work;
936 eina_threads_shutdown();
941 if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
942 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
946 if (_ecore_thread_count == 0)
948 LKL(_ecore_pending_job_threads_mutex);
949 _ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
950 LKU(_ecore_pending_job_threads_mutex);
952 if (work->func_cancel)
953 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
957 return (Ecore_Thread *) work;
960 If no thread and as we don't want to break app that rely on this
961 facility, we will lock the interface until we are done.
964 /* Handle reschedule by forcing it here. That would mean locking the app,
965 * would be better with an idler, but really to complex for a case where
966 * thread should really exist.
968 work->reschedule = EINA_FALSE;
970 func_blocking((void *)data, (Ecore_Thread *) work);
971 if (work->cancel == EINA_FALSE) func_end((void *)data, (Ecore_Thread *) work);
972 else func_cancel((void *)data, (Ecore_Thread *) work);
974 } while (work->reschedule == EINA_TRUE);
983 ecore_thread_cancel(Ecore_Thread *thread)
985 #ifdef EFL_HAVE_THREADS
986 Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
994 if (work->feedback_run)
998 if (work->u.feedback_run.send != work->u.feedback_run.received)
1002 LKL(_ecore_pending_job_threads_mutex);
1004 if ((have_main_loop_thread) &&
1005 (PHE(get_main_loop_thread(), PHS())))
1007 if (!work->feedback_run)
1008 EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
1010 if ((void *) work == (void *) thread)
1012 _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
1014 LKU(_ecore_pending_job_threads_mutex);
1016 if (work->func_cancel)
1017 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
1024 EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
1026 if ((void *) work == (void *) thread)
1028 _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
1030 LKU(_ecore_pending_job_threads_mutex);
1032 if (work->func_cancel)
1033 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
1041 LKU(_ecore_pending_job_threads_mutex);
1043 /* Delay the destruction */
1045 ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
1053 ecore_thread_check(Ecore_Thread *thread)
1055 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1057 if (!worker) return EINA_TRUE;
1058 return worker->cancel;
1061 EAPI Ecore_Thread *ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1062 Ecore_Thread_Notify_Cb func_notify,
1063 Ecore_Thread_Cb func_end,
1064 Ecore_Thread_Cb func_cancel,
1066 Eina_Bool try_no_queue)
1069 #ifdef EFL_HAVE_THREADS
1070 Ecore_Pthread_Worker *worker;
1071 Ecore_Pthread_Data *pth = NULL;
1073 if (!func_heavy) return NULL;
1075 worker = _ecore_thread_worker_new();
1076 if (!worker) goto on_error;
1078 worker->u.feedback_run.func_heavy = func_heavy;
1079 worker->u.feedback_run.func_notify = func_notify;
1080 worker->hash = NULL;
1083 worker->func_cancel = func_cancel;
1084 worker->func_end = func_end;
1085 worker->data = data;
1086 worker->cancel = EINA_FALSE;
1087 worker->feedback_run = EINA_TRUE;
1088 worker->kill = EINA_FALSE;
1089 worker->reschedule = EINA_FALSE;
1091 worker->u.feedback_run.send = 0;
1092 worker->u.feedback_run.received = 0;
1094 worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
1095 worker->u.feedback_run.direct_pipe = NULL;
1096 worker->u.feedback_run.direct_worker = NULL;
1102 worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
1103 worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
1104 worker->no_queue = EINA_TRUE;
1106 eina_threads_init();
1108 if (PHC(t, _ecore_direct_worker, worker) == 0)
1109 return (Ecore_Thread *) worker;
1111 eina_threads_shutdown();
1114 worker->no_queue = EINA_FALSE;
1116 LKL(_ecore_pending_job_threads_mutex);
1117 _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, worker);
1119 if (_ecore_thread_count == _ecore_thread_count_max)
1121 LKU(_ecore_pending_job_threads_mutex);
1122 return (Ecore_Thread *) worker;
1125 LKU(_ecore_pending_job_threads_mutex);
1127 /* One more thread could be created. */
1128 pth = malloc(sizeof (Ecore_Pthread_Data));
1129 if (!pth) goto on_error;
1131 pth->p = _ecore_thread_pipe_get();
1132 pth->death_job = _ecore_thread_worker_new();
1133 if (!pth->p || !pth->death_job) goto on_error;
1135 eina_threads_init();
1137 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
1139 _ecore_thread_count++;
1140 return (Ecore_Thread *) worker;
1143 eina_threads_shutdown();
1148 if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
1149 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
1153 if (_ecore_thread_count == 0)
1155 LKL(_ecore_pending_job_threads_mutex);
1156 _ecore_pending_job_threads_feedback = eina_list_remove(_ecore_pending_job_threads_feedback,
1158 LKU(_ecore_pending_job_threads_mutex);
1160 if (func_cancel) func_cancel((void *) data, NULL);
1164 ecore_pipe_del(worker->u.feedback_run.notify);
1170 return (Ecore_Thread *) worker;
1172 Ecore_Pthread_Worker worker;
1174 (void) try_no_queue;
1177 If no thread and as we don't want to break app that rely on this
1178 facility, we will lock the interface until we are done.
1180 worker.u.feedback_run.func_heavy = func_heavy;
1181 worker.u.feedback_run.func_notify = func_notify;
1182 worker.u.feedback_run.notify = NULL;
1183 worker.u.feedback_run.send = 0;
1184 worker.u.feedback_run.received = 0;
1185 worker.func_cancel = func_cancel;
1186 worker.func_end = func_end;
1188 worker.cancel = EINA_FALSE;
1189 worker.feedback_run = EINA_TRUE;
1190 worker.kill = EINA_FALSE;
1193 worker.reschedule = EINA_FALSE;
1195 func_heavy((void *)data, (Ecore_Thread *) &worker);
1197 if (worker.cancel) func_cancel((void *)data, (Ecore_Thread *) &worker);
1198 else func_end((void *)data, (Ecore_Thread *) &worker);
1199 } while (worker.reschedule == EINA_TRUE);
1206 ecore_thread_feedback(Ecore_Thread *thread, const void *data)
1208 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1210 if (!worker) return EINA_FALSE;
1211 if (!worker->feedback_run) return EINA_FALSE;
1213 #ifdef EFL_HAVE_THREADS
1214 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1216 worker->u.feedback_run.send++;
1217 ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
1221 worker->u.feedback_run.func_notify((void*) worker->data, thread, (void*) data);
1228 ecore_thread_reschedule(Ecore_Thread *thread)
1230 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1232 if (!worker) return EINA_FALSE;
1234 #ifdef EFL_HAVE_THREADS
1235 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1238 worker->reschedule = EINA_TRUE;
1243 ecore_thread_active_get(void)
1245 #ifdef EFL_HAVE_THREADS
1246 return _ecore_thread_count;
1253 ecore_thread_pending_get(void)
1256 #ifdef EFL_HAVE_THREADS
1257 LKL(_ecore_pending_job_threads_mutex);
1258 ret = eina_list_count(_ecore_pending_job_threads);
1259 LKU(_ecore_pending_job_threads_mutex);
1267 ecore_thread_pending_feedback_get(void)
1270 #ifdef EFL_HAVE_THREADS
1271 LKL(_ecore_pending_job_threads_mutex);
1272 ret = eina_list_count(_ecore_pending_job_threads_feedback);
1273 LKU(_ecore_pending_job_threads_mutex);
1281 ecore_thread_pending_total_get(void)
1284 #ifdef EFL_HAVE_THREADS
1285 LKL(_ecore_pending_job_threads_mutex);
1286 ret = eina_list_count(_ecore_pending_job_threads) + eina_list_count(_ecore_pending_job_threads_feedback);
1287 LKU(_ecore_pending_job_threads_mutex);
1295 ecore_thread_max_get(void)
1297 return _ecore_thread_count_max;
1301 ecore_thread_max_set(int num)
1303 if (num < 1) return;
1304 /* avoid doing something hilarious by blocking dumb users */
1305 if (num >= (2 * eina_cpu_count())) return;
1307 _ecore_thread_count_max = num;
1311 ecore_thread_max_reset(void)
1313 _ecore_thread_count_max = eina_cpu_count();
1317 ecore_thread_available_get(void)
1320 #ifdef EFL_HAVE_THREADS
1321 LKL(_ecore_pending_job_threads_mutex);
1322 ret = _ecore_thread_count_max - _ecore_thread_count;
1323 LKU(_ecore_pending_job_threads_mutex);
1331 ecore_thread_local_data_add(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1333 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1334 Ecore_Thread_Data *d;
1337 if ((!thread) || (!key) || (!value))
1339 #ifdef EFL_HAVE_THREADS
1340 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1343 worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1348 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1355 ret = eina_hash_direct_add(worker->hash, key, d);
1357 ret = eina_hash_add(worker->hash, key, d);
1366 ecore_thread_local_data_set(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb)
1368 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1369 Ecore_Thread_Data *d, *r;
1371 if ((!thread) || (!key) || (!value))
1373 #ifdef EFL_HAVE_THREADS
1374 if (!PHE(worker->self, PHS())) return NULL;
1377 worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1382 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1388 r = eina_hash_set(worker->hash, key, d);
1400 ecore_thread_local_data_find(Ecore_Thread *thread, const char *key)
1402 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1403 Ecore_Thread_Data *d;
1405 if ((!thread) || (!key))
1407 #ifdef EFL_HAVE_THREADS
1408 if (!PHE(worker->self, PHS())) return NULL;
1413 d = eina_hash_find(worker->hash, key);
1423 ecore_thread_local_data_del(Ecore_Thread *thread, const char *key)
1425 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1426 if ((!thread) || (!key))
1428 #ifdef EFL_HAVE_THREADS
1429 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1433 return eina_hash_del_by_key(worker->hash, key);
1440 ecore_thread_global_data_add(const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1443 Ecore_Thread_Data *d;
1445 if ((!key) || (!value))
1447 #ifdef EFL_HAVE_THREADS
1448 LRWKWL(_ecore_thread_global_hash_lock);
1449 if (!_ecore_thread_global_hash)
1450 _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1451 LRWKU(_ecore_thread_global_hash_lock);
1453 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1459 if (!_ecore_thread_global_hash)
1461 LRWKWL(_ecore_thread_global_hash_lock);
1463 ret = eina_hash_direct_add(_ecore_thread_global_hash, key, d);
1465 ret = eina_hash_add(_ecore_thread_global_hash, key, d);
1466 LRWKU(_ecore_thread_global_hash_lock);
1467 CDB(_ecore_thread_global_hash_cond);
1475 ecore_thread_global_data_set(const char *key, void *value, Eina_Free_Cb cb)
1477 Ecore_Thread_Data *d, *r;
1480 if ((!key) || (!value))
1482 #ifdef EFL_HAVE_THREADS
1483 LRWKWL(_ecore_thread_global_hash_lock);
1484 if (!_ecore_thread_global_hash)
1485 _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1486 LRWKU(_ecore_thread_global_hash_lock);
1488 if (!_ecore_thread_global_hash)
1491 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1497 LRWKWL(_ecore_thread_global_hash_lock);
1498 r = eina_hash_set(_ecore_thread_global_hash, key, d);
1499 LRWKU(_ecore_thread_global_hash_lock);
1500 CDB(_ecore_thread_global_hash_cond);
1512 ecore_thread_global_data_find(const char *key)
1514 Ecore_Thread_Data *ret;
1517 #ifdef EFL_HAVE_THREADS
1518 if (!_ecore_thread_global_hash) return NULL;
1520 LRWKRL(_ecore_thread_global_hash_lock);
1521 ret = eina_hash_find(_ecore_thread_global_hash, key);
1522 LRWKU(_ecore_thread_global_hash_lock);
1532 ecore_thread_global_data_del(const char *key)
1538 #ifdef EFL_HAVE_THREADS
1539 if (!_ecore_thread_global_hash)
1542 LRWKWL(_ecore_thread_global_hash_lock);
1543 ret = eina_hash_del_by_key(_ecore_thread_global_hash, key);
1544 LRWKU(_ecore_thread_global_hash_lock);
1552 ecore_thread_global_data_wait(const char *key, double seconds)
1555 Ecore_Thread_Data *ret = NULL;
1559 #ifdef EFL_HAVE_THREADS
1560 if (!_ecore_thread_global_hash)
1563 tm = ecore_time_get() + seconds;
1568 struct timespec t = { 0, 0 };
1570 t.tv_sec = (long int)tm;
1571 t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
1573 struct timeval t = { 0, 0 };
1575 t.tv_sec = (long int)tm;
1576 t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
1578 LRWKRL(_ecore_thread_global_hash_lock);
1579 ret = eina_hash_find(_ecore_thread_global_hash, key);
1580 LRWKU(_ecore_thread_global_hash_lock);
1581 if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
1583 LKL(_ecore_thread_global_hash_mutex);
1584 CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
1585 LKU(_ecore_thread_global_hash_mutex);
1587 if (ret) return ret->data;