13 #include "ecore_private.h"
15 #ifdef EFL_HAVE_THREADS
17 # ifdef EFL_HAVE_POSIX_THREADS
21 # include <sys/resource.h>
23 # include <sys/syscall.h>
27 # define PH(x) pthread_t x
28 # define PHE(x, y) pthread_equal(x, y)
29 # define PHS() pthread_self()
30 # define PHC(x, f, d) pthread_create(&(x), NULL, (void*) f, d)
31 # define PHJ(x, p) pthread_join(x, (void**)(&(p)))
32 # define PHA(x) pthread_cancel(x)
34 # define CD(x) pthread_cond_t x
35 # define CDI(x) pthread_cond_init(&(x), NULL);
36 # define CDD(x) pthread_cond_destroy(&(x));
37 # define CDB(x) pthread_cond_broadcast(&(x));
38 # define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
40 # define LK(x) pthread_mutex_t x
41 # define LKI(x) pthread_mutex_init(&(x), NULL);
42 # define LKD(x) pthread_mutex_destroy(&(x));
43 # define LKL(x) pthread_mutex_lock(&(x));
44 # define LKU(x) pthread_mutex_unlock(&(x));
46 # define LRWK(x) pthread_rwlock_t x
47 # define LRWKI(x) pthread_rwlock_init(&(x), NULL);
48 # define LRWKD(x) pthread_rwlock_destroy(&(x));
49 # define LRWKWL(x) pthread_rwlock_wrlock(&(x));
50 # define LRWKRL(x) pthread_rwlock_rdlock(&(x));
51 # define LRWKU(x) pthread_rwlock_unlock(&(x));
53 # else /* EFL_HAVE_WIN32_THREADS */
55 # define WIN32_LEAN_AND_MEAN
57 # undef WIN32_LEAN_AND_MEAN
65 # define PH(x) win32_thread *x
66 # define PHE(x, y) ((x) == (y))
67 # define PHS() (HANDLE)GetCurrentThreadId()
69 int _ecore_thread_win32_create(win32_thread **x, LPTHREAD_START_ROUTINE f, void *d)
72 t = (win32_thread *)calloc(1, sizeof(win32_thread));
76 (t)->thread = CreateThread(NULL, 0, f, d, 0, NULL);
87 # define PHC(x, f, d) _ecore_thread_win32_create(&(x), (LPTHREAD_START_ROUTINE)f, d)
89 int _ecore_thread_win32_join(win32_thread *x, void **res)
93 WaitForSingleObject(x->thread, INFINITE);
94 CloseHandle(x->thread);
96 if (res) *res = x->val;
102 # define PHJ(x, p) _ecore_thread_win32_join(x, (void**)(&(p)))
103 # define PHA(x) TerminateThread(x->thread, 0)
105 # define LK(x) HANDLE x
106 # define LKI(x) x = CreateMutex(NULL, FALSE, NULL)
107 # define LKD(x) CloseHandle(x)
108 # define LKL(x) WaitForSingleObject(x, INFINITE)
109 # define LKU(x) ReleaseMutex(x)
115 CRITICAL_SECTION threads_count_lock;
118 # define CD(x) win32_cond *x
122 x = (win32_cond *)calloc(1, sizeof(win32_cond)); \
125 x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
127 InitializeCriticalSection(&x->threads_count_lock); \
138 CloseHandle(x->semaphore); \
145 EnterCriticalSection(&x->threads_count_lock); \
146 if (x->threads_count > 0) \
147 ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \
148 LeaveCriticalSection (&x->threads_count_lock); \
151 int _ecore_thread_win32_cond_timedwait(win32_cond *c, HANDLE *external_mutex, struct timeval *t)
154 DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
156 EnterCriticalSection (&c->threads_count_lock);
158 LeaveCriticalSection (&c->threads_count_lock);
160 res = WaitForSingleObject(c->semaphore, val);
161 if (res == WAIT_OBJECT_0)
166 # define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
179 # define LRWK(x) win32_rwl *x
182 x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
191 CDI(x->cond_write); \
192 if (!x->cond_write) \
219 CDD(x->cond_write); \
227 if (x->writers || x->readers > 0) \
229 x->writers_count++; \
230 while (x->writers || x->readers > 0) \
232 EnterCriticalSection(&x->cond_write->threads_count_lock); \
233 x->cond_read->threads_count++; \
234 LeaveCriticalSection(&x->cond_write->threads_count_lock); \
235 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
236 if (res != WAIT_OBJECT_0) break; \
238 x->writers_count--; \
240 if (res == 0) x->writers_count = 1; \
249 x->readers_count++; \
252 EnterCriticalSection(&x->cond_write->threads_count_lock); \
253 x->cond_read->threads_count++; \
254 LeaveCriticalSection(&x->cond_write->threads_count_lock); \
255 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
256 if (res != WAIT_OBJECT_0) break; \
258 x->readers_count--; \
270 if (x->readers_count == 1) \
272 EnterCriticalSection(&x->cond_read->threads_count_lock); \
273 if (x->cond_read->threads_count > 0) \
274 ReleaseSemaphore(x->cond_read->semaphore, 1, 0); \
275 LeaveCriticalSection(&x->cond_read->threads_count_lock); \
277 else if (x->readers_count > 0) \
279 else if (x->writers_count > 0) \
281 EnterCriticalSection (&x->cond_write->threads_count_lock); \
282 if (x->cond_write->threads_count > 0) \
283 ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
284 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
287 else if (x->readers > 0) \
290 if (x->readers == 0 && x->writers_count > 0) \
292 EnterCriticalSection (&x->cond_write->threads_count_lock); \
293 if (x->cond_write->threads_count > 0) \
294 ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
295 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
305 typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
306 typedef struct _Ecore_Pthread Ecore_Pthread;
307 typedef struct _Ecore_Thread_Data Ecore_Thread_Data;
309 struct _Ecore_Thread_Data
315 struct _Ecore_Pthread_Worker
319 Ecore_Thread_Cb func_blocking;
322 Ecore_Thread_Cb func_heavy;
323 Ecore_Thread_Notify_Cb func_notify;
326 Ecore_Pipe *direct_pipe;
327 Ecore_Pthread_Worker *direct_worker;
334 Ecore_Thread_Cb func_cancel;
335 Ecore_Thread_Cb func_end;
336 #ifdef EFL_HAVE_THREADS
345 Eina_Bool cancel : 1;
346 Eina_Bool feedback_run : 1;
348 Eina_Bool reschedule : 1;
349 Eina_Bool no_queue : 1;
352 #ifdef EFL_HAVE_THREADS
353 typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
355 struct _Ecore_Pthread_Data
357 Ecore_Pthread_Worker *death_job;
364 static void _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte);
366 static int _ecore_thread_count_max = 0;
367 static int ECORE_THREAD_PIPE_DEL = 0;
368 static Eina_Array *_ecore_thread_pipe = NULL;
371 _ecore_thread_pipe_get(void)
373 if (eina_array_count_get(_ecore_thread_pipe) > 0)
374 return eina_array_pop(_ecore_thread_pipe);
376 return ecore_pipe_add(_ecore_thread_handler, NULL);
379 #ifdef EFL_HAVE_THREADS
380 static int _ecore_thread_count = 0;
382 static Ecore_Event_Handler *del_handler = NULL;
383 static Eina_List *_ecore_active_job_threads = NULL;
384 static Eina_List *_ecore_pending_job_threads = NULL;
385 static Eina_List *_ecore_pending_job_threads_feedback = NULL;
386 static LK(_ecore_pending_job_threads_mutex);
388 static Eina_Hash *_ecore_thread_global_hash = NULL;
389 static LRWK(_ecore_thread_global_hash_lock);
390 static LK(_ecore_thread_global_hash_mutex);
391 static CD(_ecore_thread_global_hash_cond);
393 static LK(_ecore_main_loop_mutex);
394 static Eina_Bool have_main_loop_thread = 0;
396 static Eina_Trash *_ecore_thread_worker_trash = NULL;
397 static int _ecore_thread_worker_count = 0;
399 static void *_ecore_thread_worker(Ecore_Pthread_Data *pth);
400 static Ecore_Pthread_Worker *_ecore_thread_worker_new(void);
402 static PH(get_main_loop_thread)(void)
404 static PH(main_loop_thread);
405 static pid_t main_loop_pid;
406 pid_t pid = getpid();
408 if (pid != main_loop_pid)
411 main_loop_thread = PHS();
412 have_main_loop_thread = 1;
415 return main_loop_thread;
419 _ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
421 if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16)
427 eina_trash_push(&_ecore_thread_worker_trash, worker);
431 _ecore_thread_data_free(void *data)
433 Ecore_Thread_Data *d = data;
435 if (d->cb) d->cb(d->data);
440 _ecore_thread_pipe_free(void *data __UNUSED__, void *event)
442 Ecore_Pipe *p = event;
444 if (eina_array_count_get(_ecore_thread_pipe) < 50)
445 eina_array_push(_ecore_thread_pipe, p);
448 eina_threads_shutdown();
452 _ecore_thread_pipe_del(void *data __UNUSED__, int type __UNUSED__, void *event __UNUSED__)
454 /* This is a hack to delay pipe destruction until we are out of its internal loop. */
455 return ECORE_CALLBACK_CANCEL;
459 _ecore_thread_end(Ecore_Pthread_Data *pth, Ecore_Thread *work)
461 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) work;
464 if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
465 _ecore_thread_count--;
467 if (PHJ(pth->thread, p) != 0)
470 if (eina_list_count(_ecore_pending_job_threads) > 0
471 && (unsigned int) _ecore_thread_count < eina_list_count(_ecore_pending_job_threads)
472 && _ecore_thread_count < _ecore_thread_count_max)
474 /* One more thread should be created. */
475 INF("spawning threads because of still pending jobs.");
477 pth->death_job = _ecore_thread_worker_new();
478 if (!pth->p || !pth->death_job) goto end;
482 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
484 _ecore_thread_count++;
488 eina_threads_shutdown();
491 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
494 _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
496 ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
501 _ecore_thread_kill(Ecore_Pthread_Worker *work)
505 if (work->func_cancel)
506 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
511 work->func_end((void *) work->data, (Ecore_Thread *) work);
514 if (work->feedback_run)
516 ecore_pipe_del(work->u.feedback_run.notify);
518 if (work->u.feedback_run.direct_pipe)
519 eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
520 if (work->u.feedback_run.direct_worker)
521 _ecore_thread_worker_free(work->u.feedback_run.direct_worker);
526 eina_hash_free(work->hash);
531 _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
533 Ecore_Pthread_Worker *work;
535 if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
537 work = *(Ecore_Pthread_Worker **)buffer;
539 if (work->feedback_run)
541 if (work->u.feedback_run.send != work->u.feedback_run.received)
543 work->kill = EINA_TRUE;
548 _ecore_thread_kill(work);
552 _ecore_notify_handler(void *data, void *buffer, unsigned int nbyte)
554 Ecore_Pthread_Worker *work = data;
557 if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
559 user_data = *(void **)buffer;
560 work->u.feedback_run.received++;
562 if (work->u.feedback_run.func_notify)
563 work->u.feedback_run.func_notify((void *) work->data, (Ecore_Thread *) work, user_data);
565 /* Force reading all notify event before killing the thread */
566 if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
568 _ecore_thread_kill(work);
573 _ecore_short_job(Ecore_Pipe *end_pipe)
575 Ecore_Pthread_Worker *work;
577 while (_ecore_pending_job_threads)
579 LKL(_ecore_pending_job_threads_mutex);
581 if (!_ecore_pending_job_threads)
583 LKU(_ecore_pending_job_threads_mutex);
587 work = eina_list_data_get(_ecore_pending_job_threads);
588 _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
589 _ecore_pending_job_threads);
591 LKU(_ecore_pending_job_threads_mutex);
594 work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work);
596 if (work->reschedule)
598 work->reschedule = EINA_FALSE;
600 LKL(_ecore_pending_job_threads_mutex);
601 _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
602 LKU(_ecore_pending_job_threads_mutex);
606 ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
612 _ecore_feedback_job(Ecore_Pipe *end_pipe, PH(thread))
614 Ecore_Pthread_Worker *work;
616 while (_ecore_pending_job_threads_feedback)
618 LKL(_ecore_pending_job_threads_mutex);
620 if (!_ecore_pending_job_threads_feedback)
622 LKU(_ecore_pending_job_threads_mutex);
626 work = eina_list_data_get(_ecore_pending_job_threads_feedback);
627 _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
628 _ecore_pending_job_threads_feedback);
630 LKU(_ecore_pending_job_threads_mutex);
634 work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
636 if (work->reschedule)
638 work->reschedule = EINA_FALSE;
640 LKL(_ecore_pending_job_threads_mutex);
641 _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
642 LKU(_ecore_pending_job_threads_mutex);
646 ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
652 _ecore_direct_worker(Ecore_Pthread_Worker *work)
654 Ecore_Pthread_Data *pth;
656 #ifdef EFL_POSIX_THREADS
657 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
658 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
661 eina_sched_prio_drop();
663 pth = malloc(sizeof (Ecore_Pthread_Data));
664 if (!pth) return NULL;
666 pth->p = work->u.feedback_run.direct_pipe;
674 work->self = pth->thread;
675 work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work);
677 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
679 work = work->u.feedback_run.direct_worker;
687 work->u.short_run.func_blocking = NULL;
688 work->func_end = (void *) _ecore_thread_end;
689 work->func_cancel = NULL;
690 work->cancel = EINA_FALSE;
691 work->feedback_run = EINA_FALSE;
692 work->kill = EINA_FALSE;
697 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
703 _ecore_thread_worker(Ecore_Pthread_Data *pth)
705 Ecore_Pthread_Worker *work;
707 #ifdef EFL_POSIX_THREADS
708 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
709 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
712 eina_sched_prio_drop();
715 if (_ecore_pending_job_threads) _ecore_short_job(pth->p);
716 if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
718 /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
720 LKL(_ecore_pending_job_threads_mutex);
721 if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
723 LKU(_ecore_pending_job_threads_mutex);
726 LKU(_ecore_pending_job_threads_mutex);
728 /* Sleep a little to prevent premature death */
730 Sleep(1); /* around 50ms */
735 LKL(_ecore_pending_job_threads_mutex);
736 if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
738 LKU(_ecore_pending_job_threads_mutex);
741 LKU(_ecore_pending_job_threads_mutex);
743 work = pth->death_job;
744 if (!work) return NULL;
747 work->u.short_run.func_blocking = NULL;
748 work->func_end = (void *) _ecore_thread_end;
749 work->func_cancel = NULL;
750 work->cancel = EINA_FALSE;
751 work->feedback_run = EINA_FALSE;
752 work->kill = EINA_FALSE;
757 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
764 static Ecore_Pthread_Worker *
765 _ecore_thread_worker_new(void)
767 Ecore_Pthread_Worker *result;
769 #ifdef EFL_HAVE_THREADS
770 result = eina_trash_pop(&_ecore_thread_worker_trash);
772 if (!result) result = malloc(sizeof (Ecore_Pthread_Worker));
773 else _ecore_thread_worker_count--;
777 return malloc(sizeof (Ecore_Pthread_Worker));
782 _ecore_thread_init(void)
784 _ecore_thread_count_max = eina_cpu_count();
785 if (_ecore_thread_count_max <= 0)
786 _ecore_thread_count_max = 1;
788 ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
789 _ecore_thread_pipe = eina_array_new(8);
791 #ifdef EFL_HAVE_THREADS
792 del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
794 LKI(_ecore_pending_job_threads_mutex);
795 LRWKI(_ecore_thread_global_hash_lock);
796 LKI(_ecore_thread_global_hash_mutex);
797 LKI(_ecore_main_loop_mutex);
798 CDI(_ecore_thread_global_hash_cond);
803 _ecore_thread_shutdown(void)
805 /* FIXME: If function are still running in the background, should we kill them ? */
807 Eina_Array_Iterator it;
810 #ifdef EFL_HAVE_THREADS
811 Ecore_Pthread_Worker *work;
812 Ecore_Pthread_Data *pth;
814 LKL(_ecore_pending_job_threads_mutex);
816 EINA_LIST_FREE(_ecore_pending_job_threads, work)
818 if (work->func_cancel)
819 work->func_cancel((void *)work->data, (Ecore_Thread *) work);
823 EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
825 if (work->func_cancel)
826 work->func_cancel((void *)work->data, (Ecore_Thread *) work);
830 LKU(_ecore_pending_job_threads_mutex);
832 /* Improve emergency shutdown */
833 EINA_LIST_FREE(_ecore_active_job_threads, pth)
838 PHJ(pth->thread, ep);
840 ecore_pipe_del(pth->p);
842 if (_ecore_thread_global_hash)
843 eina_hash_free(_ecore_thread_global_hash);
844 ecore_event_handler_del(del_handler);
845 have_main_loop_thread = 0;
848 LKD(_ecore_pending_job_threads_mutex);
849 LRWKD(_ecore_thread_global_hash_lock);
850 LKD(_ecore_thread_global_hash_mutex);
851 CDD(_ecore_thread_global_hash_cond);
854 EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
857 eina_array_free(_ecore_thread_pipe);
858 _ecore_thread_pipe = NULL;
862 _ecore_thread_assert_main_loop_thread(const char *function)
865 #ifdef EFL_HAVE_THREADS
866 good = PHE(get_main_loop_thread(), PHS());
872 EINA_LOG_CRIT("Call to %s from wrong thread!", function);
879 #ifdef HAVE_THREAD_SAFETY
880 static int lock_count;
885 LKL(_ecore_main_loop_mutex);
887 assert(lock_count == 1);
894 assert(lock_count == 0);
895 LKU(_ecore_main_loop_mutex);
900 ecore_thread_run(Ecore_Thread_Cb func_blocking,
901 Ecore_Thread_Cb func_end,
902 Ecore_Thread_Cb func_cancel,
905 Ecore_Pthread_Worker *work;
906 #ifdef EFL_HAVE_THREADS
907 Ecore_Pthread_Data *pth = NULL;
910 if (!func_blocking) return NULL;
912 work = _ecore_thread_worker_new();
916 func_cancel((void *) data, NULL);
920 work->u.short_run.func_blocking = func_blocking;
921 work->func_end = func_end;
922 work->func_cancel = func_cancel;
923 work->cancel = EINA_FALSE;
924 work->feedback_run = EINA_FALSE;
925 work->kill = EINA_FALSE;
926 work->reschedule = EINA_FALSE;
929 #ifdef EFL_HAVE_THREADS
934 LKL(_ecore_pending_job_threads_mutex);
935 _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
937 if (_ecore_thread_count == _ecore_thread_count_max)
939 LKU(_ecore_pending_job_threads_mutex);
940 return (Ecore_Thread *) work;
943 LKU(_ecore_pending_job_threads_mutex);
945 /* One more thread could be created. */
946 pth = malloc(sizeof (Ecore_Pthread_Data));
947 if (!pth) goto on_error;
949 pth->p = _ecore_thread_pipe_get();
950 pth->death_job = _ecore_thread_worker_new();
951 if (!pth->p || !pth->death_job) goto on_error;
955 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
957 _ecore_thread_count++;
958 return (Ecore_Thread *) work;
961 eina_threads_shutdown();
966 if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
967 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
971 if (_ecore_thread_count == 0)
973 LKL(_ecore_pending_job_threads_mutex);
974 _ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
975 LKU(_ecore_pending_job_threads_mutex);
977 if (work->func_cancel)
978 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
982 return (Ecore_Thread *) work;
985 If no thread and as we don't want to break app that rely on this
986 facility, we will lock the interface until we are done.
989 /* Handle reschedule by forcing it here. That would mean locking the app,
990 * would be better with an idler, but really to complex for a case where
991 * thread should really exist.
993 work->reschedule = EINA_FALSE;
995 func_blocking((void *)data, (Ecore_Thread *) work);
996 if (work->cancel == EINA_FALSE) func_end((void *)data, (Ecore_Thread *) work);
997 else func_cancel((void *)data, (Ecore_Thread *) work);
999 } while (work->reschedule == EINA_TRUE);
1008 ecore_thread_cancel(Ecore_Thread *thread)
1010 #ifdef EFL_HAVE_THREADS
1011 Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
1019 if (work->feedback_run)
1023 if (work->u.feedback_run.send != work->u.feedback_run.received)
1027 LKL(_ecore_pending_job_threads_mutex);
1029 if ((have_main_loop_thread) &&
1030 (PHE(get_main_loop_thread(), PHS())))
1032 if (!work->feedback_run)
1033 EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
1035 if ((void *) work == (void *) thread)
1037 _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
1039 LKU(_ecore_pending_job_threads_mutex);
1041 if (work->func_cancel)
1042 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
1049 EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
1051 if ((void *) work == (void *) thread)
1053 _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
1055 LKU(_ecore_pending_job_threads_mutex);
1057 if (work->func_cancel)
1058 work->func_cancel((void *) work->data, (Ecore_Thread *) work);
1066 LKU(_ecore_pending_job_threads_mutex);
1068 /* Delay the destruction */
1070 ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
1078 ecore_thread_check(Ecore_Thread *thread)
1080 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1082 if (!worker) return EINA_TRUE;
1083 return worker->cancel;
1086 EAPI Ecore_Thread *ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1087 Ecore_Thread_Notify_Cb func_notify,
1088 Ecore_Thread_Cb func_end,
1089 Ecore_Thread_Cb func_cancel,
1091 Eina_Bool try_no_queue)
1094 #ifdef EFL_HAVE_THREADS
1095 Ecore_Pthread_Worker *worker;
1096 Ecore_Pthread_Data *pth = NULL;
1098 if (!func_heavy) return NULL;
1100 worker = _ecore_thread_worker_new();
1101 if (!worker) goto on_error;
1103 worker->u.feedback_run.func_heavy = func_heavy;
1104 worker->u.feedback_run.func_notify = func_notify;
1105 worker->hash = NULL;
1108 worker->func_cancel = func_cancel;
1109 worker->func_end = func_end;
1110 worker->data = data;
1111 worker->cancel = EINA_FALSE;
1112 worker->feedback_run = EINA_TRUE;
1113 worker->kill = EINA_FALSE;
1114 worker->reschedule = EINA_FALSE;
1116 worker->u.feedback_run.send = 0;
1117 worker->u.feedback_run.received = 0;
1119 worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
1120 worker->u.feedback_run.direct_pipe = NULL;
1121 worker->u.feedback_run.direct_worker = NULL;
1127 worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
1128 worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
1129 worker->no_queue = EINA_TRUE;
1131 eina_threads_init();
1133 if (PHC(t, _ecore_direct_worker, worker) == 0)
1134 return (Ecore_Thread *) worker;
1136 eina_threads_shutdown();
1139 worker->no_queue = EINA_FALSE;
1141 LKL(_ecore_pending_job_threads_mutex);
1142 _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, worker);
1144 if (_ecore_thread_count == _ecore_thread_count_max)
1146 LKU(_ecore_pending_job_threads_mutex);
1147 return (Ecore_Thread *) worker;
1150 LKU(_ecore_pending_job_threads_mutex);
1152 /* One more thread could be created. */
1153 pth = malloc(sizeof (Ecore_Pthread_Data));
1154 if (!pth) goto on_error;
1156 pth->p = _ecore_thread_pipe_get();
1157 pth->death_job = _ecore_thread_worker_new();
1158 if (!pth->p || !pth->death_job) goto on_error;
1160 eina_threads_init();
1162 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
1164 _ecore_thread_count++;
1165 return (Ecore_Thread *) worker;
1168 eina_threads_shutdown();
1173 if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
1174 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
1178 if (_ecore_thread_count == 0)
1180 LKL(_ecore_pending_job_threads_mutex);
1181 _ecore_pending_job_threads_feedback = eina_list_remove(_ecore_pending_job_threads_feedback,
1183 LKU(_ecore_pending_job_threads_mutex);
1185 if (func_cancel) func_cancel((void *) data, NULL);
1189 ecore_pipe_del(worker->u.feedback_run.notify);
1195 return (Ecore_Thread *) worker;
1197 Ecore_Pthread_Worker worker;
1199 (void) try_no_queue;
1202 If no thread and as we don't want to break app that rely on this
1203 facility, we will lock the interface until we are done.
1205 worker.u.feedback_run.func_heavy = func_heavy;
1206 worker.u.feedback_run.func_notify = func_notify;
1207 worker.u.feedback_run.notify = NULL;
1208 worker.u.feedback_run.send = 0;
1209 worker.u.feedback_run.received = 0;
1210 worker.func_cancel = func_cancel;
1211 worker.func_end = func_end;
1213 worker.cancel = EINA_FALSE;
1214 worker.feedback_run = EINA_TRUE;
1215 worker.kill = EINA_FALSE;
1218 worker.reschedule = EINA_FALSE;
1220 func_heavy((void *)data, (Ecore_Thread *) &worker);
1222 if (worker.cancel) func_cancel((void *)data, (Ecore_Thread *) &worker);
1223 else func_end((void *)data, (Ecore_Thread *) &worker);
1224 } while (worker.reschedule == EINA_TRUE);
1231 ecore_thread_feedback(Ecore_Thread *thread, const void *data)
1233 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1235 if (!worker) return EINA_FALSE;
1236 if (!worker->feedback_run) return EINA_FALSE;
1238 #ifdef EFL_HAVE_THREADS
1239 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1241 worker->u.feedback_run.send++;
1242 ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
1246 worker->u.feedback_run.func_notify((void*) worker->data, thread, (void*) data);
1253 ecore_thread_reschedule(Ecore_Thread *thread)
1255 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1257 if (!worker) return EINA_FALSE;
1259 #ifdef EFL_HAVE_THREADS
1260 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1263 worker->reschedule = EINA_TRUE;
1268 ecore_thread_active_get(void)
1270 #ifdef EFL_HAVE_THREADS
1271 return _ecore_thread_count;
1278 ecore_thread_pending_get(void)
1281 #ifdef EFL_HAVE_THREADS
1282 LKL(_ecore_pending_job_threads_mutex);
1283 ret = eina_list_count(_ecore_pending_job_threads);
1284 LKU(_ecore_pending_job_threads_mutex);
1292 ecore_thread_pending_feedback_get(void)
1295 #ifdef EFL_HAVE_THREADS
1296 LKL(_ecore_pending_job_threads_mutex);
1297 ret = eina_list_count(_ecore_pending_job_threads_feedback);
1298 LKU(_ecore_pending_job_threads_mutex);
1306 ecore_thread_pending_total_get(void)
1309 #ifdef EFL_HAVE_THREADS
1310 LKL(_ecore_pending_job_threads_mutex);
1311 ret = eina_list_count(_ecore_pending_job_threads) + eina_list_count(_ecore_pending_job_threads_feedback);
1312 LKU(_ecore_pending_job_threads_mutex);
1320 ecore_thread_max_get(void)
1322 return _ecore_thread_count_max;
1326 ecore_thread_max_set(int num)
1328 if (num < 1) return;
1329 /* avoid doing something hilarious by blocking dumb users */
1330 if (num >= (2 * eina_cpu_count())) return;
1332 _ecore_thread_count_max = num;
1336 ecore_thread_max_reset(void)
1338 _ecore_thread_count_max = eina_cpu_count();
1342 ecore_thread_available_get(void)
1345 #ifdef EFL_HAVE_THREADS
1346 LKL(_ecore_pending_job_threads_mutex);
1347 ret = _ecore_thread_count_max - _ecore_thread_count;
1348 LKU(_ecore_pending_job_threads_mutex);
1356 ecore_thread_local_data_add(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1358 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1359 Ecore_Thread_Data *d;
1362 if ((!thread) || (!key) || (!value))
1364 #ifdef EFL_HAVE_THREADS
1365 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1368 worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1373 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1380 ret = eina_hash_direct_add(worker->hash, key, d);
1382 ret = eina_hash_add(worker->hash, key, d);
1391 ecore_thread_local_data_set(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb)
1393 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1394 Ecore_Thread_Data *d, *r;
1396 if ((!thread) || (!key) || (!value))
1398 #ifdef EFL_HAVE_THREADS
1399 if (!PHE(worker->self, PHS())) return NULL;
1402 worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1407 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1413 r = eina_hash_set(worker->hash, key, d);
1425 ecore_thread_local_data_find(Ecore_Thread *thread, const char *key)
1427 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1428 Ecore_Thread_Data *d;
1430 if ((!thread) || (!key))
1432 #ifdef EFL_HAVE_THREADS
1433 if (!PHE(worker->self, PHS())) return NULL;
1438 d = eina_hash_find(worker->hash, key);
1448 ecore_thread_local_data_del(Ecore_Thread *thread, const char *key)
1450 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1451 if ((!thread) || (!key))
1453 #ifdef EFL_HAVE_THREADS
1454 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1458 return eina_hash_del_by_key(worker->hash, key);
1465 ecore_thread_global_data_add(const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1468 Ecore_Thread_Data *d;
1470 if ((!key) || (!value))
1472 #ifdef EFL_HAVE_THREADS
1473 LRWKWL(_ecore_thread_global_hash_lock);
1474 if (!_ecore_thread_global_hash)
1475 _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1476 LRWKU(_ecore_thread_global_hash_lock);
1478 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1484 if (!_ecore_thread_global_hash)
1486 LRWKWL(_ecore_thread_global_hash_lock);
1488 ret = eina_hash_direct_add(_ecore_thread_global_hash, key, d);
1490 ret = eina_hash_add(_ecore_thread_global_hash, key, d);
1491 LRWKU(_ecore_thread_global_hash_lock);
1492 CDB(_ecore_thread_global_hash_cond);
1500 ecore_thread_global_data_set(const char *key, void *value, Eina_Free_Cb cb)
1502 Ecore_Thread_Data *d, *r;
1505 if ((!key) || (!value))
1507 #ifdef EFL_HAVE_THREADS
1508 LRWKWL(_ecore_thread_global_hash_lock);
1509 if (!_ecore_thread_global_hash)
1510 _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1511 LRWKU(_ecore_thread_global_hash_lock);
1513 if (!_ecore_thread_global_hash)
1516 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1522 LRWKWL(_ecore_thread_global_hash_lock);
1523 r = eina_hash_set(_ecore_thread_global_hash, key, d);
1524 LRWKU(_ecore_thread_global_hash_lock);
1525 CDB(_ecore_thread_global_hash_cond);
1537 ecore_thread_global_data_find(const char *key)
1539 Ecore_Thread_Data *ret;
1542 #ifdef EFL_HAVE_THREADS
1543 if (!_ecore_thread_global_hash) return NULL;
1545 LRWKRL(_ecore_thread_global_hash_lock);
1546 ret = eina_hash_find(_ecore_thread_global_hash, key);
1547 LRWKU(_ecore_thread_global_hash_lock);
1557 ecore_thread_global_data_del(const char *key)
1563 #ifdef EFL_HAVE_THREADS
1564 if (!_ecore_thread_global_hash)
1567 LRWKWL(_ecore_thread_global_hash_lock);
1568 ret = eina_hash_del_by_key(_ecore_thread_global_hash, key);
1569 LRWKU(_ecore_thread_global_hash_lock);
1577 ecore_thread_global_data_wait(const char *key, double seconds)
1580 Ecore_Thread_Data *ret = NULL;
1584 #ifdef EFL_HAVE_THREADS
1585 if (!_ecore_thread_global_hash)
1588 tm = ecore_time_get() + seconds;
1593 struct timespec t = { 0, 0 };
1595 t.tv_sec = (long int)tm;
1596 t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
1598 struct timeval t = { 0, 0 };
1600 t.tv_sec = (long int)tm;
1601 t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
1603 LRWKRL(_ecore_thread_global_hash_lock);
1604 ret = eina_hash_find(_ecore_thread_global_hash, key);
1605 LRWKU(_ecore_thread_global_hash_lock);
1606 if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
1608 LKL(_ecore_thread_global_hash_mutex);
1609 CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
1610 LKU(_ecore_thread_global_hash_mutex);
1612 if (ret) return ret->data;