15 #include "ecore_private.h"
17 #ifdef EFL_HAVE_THREADS
19 # ifdef EFL_HAVE_POSIX_THREADS
23 # include <sys/resource.h>
25 # include <sys/syscall.h>
29 # define PH(x) pthread_t x
30 # define PHE(x, y) pthread_equal(x, y)
31 # define PHS() pthread_self()
32 # define PHC(x, f, d) pthread_create(&(x), NULL, (void *)f, d)
33 # define PHJ(x, p) pthread_join(x, (void **)(&(p)))
34 # define PHA(x) pthread_cancel(x)
36 # define CD(x) pthread_cond_t x
37 # define CDI(x) pthread_cond_init(&(x), NULL);
38 # define CDD(x) pthread_cond_destroy(&(x));
39 # define CDB(x) pthread_cond_broadcast(&(x));
40 # define CDW(x, y, t) pthread_cond_timedwait(&(x), &(y), t);
42 # define LK(x) pthread_mutex_t x
43 # define LKI(x) pthread_mutex_init(&(x), NULL);
44 # define LKD(x) pthread_mutex_destroy(&(x));
45 # define LKL(x) pthread_mutex_lock(&(x));
46 # define LKU(x) pthread_mutex_unlock(&(x));
48 # define LRWK(x) pthread_rwlock_t x
49 # define LRWKI(x) pthread_rwlock_init(&(x), NULL);
50 # define LRWKD(x) pthread_rwlock_destroy(&(x));
51 # define LRWKWL(x) pthread_rwlock_wrlock(&(x));
52 # define LRWKRL(x) pthread_rwlock_rdlock(&(x));
53 # define LRWKU(x) pthread_rwlock_unlock(&(x));
55 # else /* EFL_HAVE_WIN32_THREADS */
57 # define WIN32_LEAN_AND_MEAN
59 # undef WIN32_LEAN_AND_MEAN
67 # define PH(x) win32_thread * x
68 # define PHE(x, y) ((x) == (y))
69 # define PHS() (HANDLE)GetCurrentThreadId()
72 _ecore_thread_win32_create(win32_thread **x,
73 LPTHREAD_START_ROUTINE f,
77 t = (win32_thread *)calloc(1, sizeof(win32_thread));
81 (t)->thread = CreateThread(NULL, 0, f, d, 0, NULL);
93 # define PHC(x, f, d) _ecore_thread_win32_create(&(x), (LPTHREAD_START_ROUTINE)f, d)
96 _ecore_thread_win32_join(win32_thread *x,
101 WaitForSingleObject(x->thread, INFINITE);
102 CloseHandle(x->thread);
104 if (res) *res = x->val;
110 # define PHJ(x, p) _ecore_thread_win32_join(x, (void **)(&(p)))
111 # define PHA(x) TerminateThread(x->thread, 0)
113 # define LK(x) HANDLE x
114 # define LKI(x) x = CreateMutex(NULL, FALSE, NULL)
115 # define LKD(x) CloseHandle(x)
116 # define LKL(x) WaitForSingleObject(x, INFINITE)
117 # define LKU(x) ReleaseMutex(x)
123 CRITICAL_SECTION threads_count_lock;
126 # define CD(x) win32_cond * x
130 x = (win32_cond *)calloc(1, sizeof(win32_cond)); \
133 x->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); \
135 InitializeCriticalSection(&x->threads_count_lock); \
146 CloseHandle(x->semaphore); \
153 EnterCriticalSection(&x->threads_count_lock); \
154 if (x->threads_count > 0) \
155 ReleaseSemaphore(x->semaphore, x->threads_count, NULL); \
156 LeaveCriticalSection (&x->threads_count_lock); \
160 _ecore_thread_win32_cond_timedwait(win32_cond *c,
161 HANDLE *external_mutex,
165 DWORD val = t->tv_sec * 1000 + (t->tv_usec / 1000);
167 EnterCriticalSection (&c->threads_count_lock);
169 LeaveCriticalSection (&c->threads_count_lock);
171 res = WaitForSingleObject(c->semaphore, val);
172 if (res == WAIT_OBJECT_0)
178 # define CDW(x, y, t) _ecore_thread_win32_cond_timedwait(x, y, t)
191 # define LRWK(x) win32_rwl * x
194 x = (win32_rwl *)calloc(1, sizeof(win32_rwl)); \
203 CDI(x->cond_write); \
204 if (!x->cond_write) \
231 CDD(x->cond_write); \
239 if (x->writers || x->readers > 0) \
241 x->writers_count++; \
242 while (x->writers || x->readers > 0) \
244 EnterCriticalSection(&x->cond_write->threads_count_lock); \
245 x->cond_read->threads_count++; \
246 LeaveCriticalSection(&x->cond_write->threads_count_lock); \
247 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
248 if (res != WAIT_OBJECT_0) break; \
250 x->writers_count--; \
252 if (res == 0) x->writers_count = 1; \
261 x->readers_count++; \
264 EnterCriticalSection(&x->cond_write->threads_count_lock); \
265 x->cond_read->threads_count++; \
266 LeaveCriticalSection(&x->cond_write->threads_count_lock); \
267 res = WaitForSingleObject(x->cond_write->semaphore, INFINITE); \
268 if (res != WAIT_OBJECT_0) break; \
270 x->readers_count--; \
282 if (x->readers_count == 1) \
284 EnterCriticalSection(&x->cond_read->threads_count_lock); \
285 if (x->cond_read->threads_count > 0) \
286 ReleaseSemaphore(x->cond_read->semaphore, 1, 0); \
287 LeaveCriticalSection(&x->cond_read->threads_count_lock); \
289 else if (x->readers_count > 0) \
291 else if (x->writers_count > 0) \
293 EnterCriticalSection (&x->cond_write->threads_count_lock); \
294 if (x->cond_write->threads_count > 0) \
295 ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
296 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
299 else if (x->readers > 0) \
302 if (x->readers == 0 && x->writers_count > 0) \
304 EnterCriticalSection (&x->cond_write->threads_count_lock); \
305 if (x->cond_write->threads_count > 0) \
306 ReleaseSemaphore(x->cond_write->semaphore, 1, 0); \
307 LeaveCriticalSection (&x->cond_write->threads_count_lock); \
317 typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
318 typedef struct _Ecore_Pthread Ecore_Pthread;
319 typedef struct _Ecore_Thread_Data Ecore_Thread_Data;
321 struct _Ecore_Thread_Data
327 struct _Ecore_Pthread_Worker
332 Ecore_Thread_Cb func_blocking;
336 Ecore_Thread_Cb func_heavy;
337 Ecore_Thread_Notify_Cb func_notify;
340 Ecore_Pipe *direct_pipe;
341 Ecore_Pthread_Worker *direct_worker;
348 Ecore_Thread_Cb func_cancel;
349 Ecore_Thread_Cb func_end;
350 #ifdef EFL_HAVE_THREADS
359 Eina_Bool cancel : 1;
360 Eina_Bool feedback_run : 1;
362 Eina_Bool reschedule : 1;
363 Eina_Bool no_queue : 1;
366 #ifdef EFL_HAVE_THREADS
367 typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
369 struct _Ecore_Pthread_Data
371 Ecore_Pthread_Worker *death_job;
378 static int _ecore_thread_count_max = 0;
379 static int ECORE_THREAD_PIPE_DEL = 0;
380 static Eina_Array *_ecore_thread_pipe = NULL;
382 #ifdef EFL_HAVE_THREADS
384 static void _ecore_thread_handler(void *data __UNUSED__,
389 _ecore_thread_pipe_get(void)
391 if (eina_array_count_get(_ecore_thread_pipe) > 0)
392 return eina_array_pop(_ecore_thread_pipe);
394 return ecore_pipe_add(_ecore_thread_handler, NULL);
397 static int _ecore_thread_count = 0;
399 static Ecore_Event_Handler *del_handler = NULL;
400 static Eina_List *_ecore_active_job_threads = NULL;
401 static Eina_List *_ecore_pending_job_threads = NULL;
402 static Eina_List *_ecore_pending_job_threads_feedback = NULL;
403 static LK(_ecore_pending_job_threads_mutex);
405 static Eina_Hash *_ecore_thread_global_hash = NULL;
406 static LRWK(_ecore_thread_global_hash_lock);
407 static LK(_ecore_thread_global_hash_mutex);
408 static CD(_ecore_thread_global_hash_cond);
410 static Eina_Bool have_main_loop_thread = 0;
412 static Eina_Trash *_ecore_thread_worker_trash = NULL;
413 static int _ecore_thread_worker_count = 0;
415 static void *_ecore_thread_worker(Ecore_Pthread_Data *pth);
416 static Ecore_Pthread_Worker *_ecore_thread_worker_new(void);
418 static PH(get_main_loop_thread) (void)
420 static PH(main_loop_thread);
421 static pid_t main_loop_pid;
422 pid_t pid = getpid();
424 if (pid != main_loop_pid)
427 main_loop_thread = PHS();
428 have_main_loop_thread = 1;
431 return main_loop_thread;
435 _ecore_thread_worker_free(Ecore_Pthread_Worker *worker)
437 if (_ecore_thread_worker_count > (_ecore_thread_count_max + 1) * 16)
443 eina_trash_push(&_ecore_thread_worker_trash, worker);
447 _ecore_thread_data_free(void *data)
449 Ecore_Thread_Data *d = data;
451 if (d->cb) d->cb(d->data);
456 _ecore_thread_pipe_free(void *data __UNUSED__,
459 Ecore_Pipe *p = event;
461 if (eina_array_count_get(_ecore_thread_pipe) < 50)
462 eina_array_push(_ecore_thread_pipe, p);
465 eina_threads_shutdown();
469 _ecore_thread_pipe_del(void *data __UNUSED__,
471 void *event __UNUSED__)
473 /* This is a hack to delay pipe destruction until we are out of its internal loop. */
474 return ECORE_CALLBACK_CANCEL;
478 _ecore_thread_end(Ecore_Pthread_Data *pth,
481 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)work;
484 if (!worker->feedback_run || (worker->feedback_run && !worker->no_queue))
485 _ecore_thread_count--;
487 if (PHJ(pth->thread, p) != 0)
490 if (eina_list_count(_ecore_pending_job_threads) > 0
491 && (unsigned int)_ecore_thread_count < eina_list_count(_ecore_pending_job_threads)
492 && _ecore_thread_count < _ecore_thread_count_max)
494 /* One more thread should be created. */
495 INF("spawning threads because of still pending jobs.");
497 pth->death_job = _ecore_thread_worker_new();
498 if (!pth->p || !pth->death_job) goto end;
502 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
504 _ecore_thread_count++;
508 eina_threads_shutdown();
511 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
514 _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
516 ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
521 _ecore_thread_kill(Ecore_Pthread_Worker *work)
525 if (work->func_cancel)
526 work->func_cancel((void *)work->data, (Ecore_Thread *)work);
531 work->func_end((void *)work->data, (Ecore_Thread *)work);
534 if (work->feedback_run)
536 ecore_pipe_del(work->u.feedback_run.notify);
538 if (work->u.feedback_run.direct_pipe)
539 eina_array_push(_ecore_thread_pipe, work->u.feedback_run.direct_pipe);
540 if (work->u.feedback_run.direct_worker)
541 _ecore_thread_worker_free(work->u.feedback_run.direct_worker);
546 eina_hash_free(work->hash);
551 _ecore_thread_handler(void *data __UNUSED__,
555 Ecore_Pthread_Worker *work;
557 if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
559 work = *(Ecore_Pthread_Worker **)buffer;
561 if (work->feedback_run)
563 if (work->u.feedback_run.send != work->u.feedback_run.received)
565 work->kill = EINA_TRUE;
570 _ecore_thread_kill(work);
574 _ecore_notify_handler(void *data,
578 Ecore_Pthread_Worker *work = data;
581 if (nbyte != sizeof (Ecore_Pthread_Worker *)) return;
583 user_data = *(void **)buffer;
584 work->u.feedback_run.received++;
586 if (work->u.feedback_run.func_notify)
587 work->u.feedback_run.func_notify((void *)work->data, (Ecore_Thread *)work, user_data);
589 /* Force reading all notify event before killing the thread */
590 if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
592 _ecore_thread_kill(work);
597 _ecore_short_job(Ecore_Pipe *end_pipe)
599 Ecore_Pthread_Worker *work;
601 while (_ecore_pending_job_threads)
603 LKL(_ecore_pending_job_threads_mutex);
605 if (!_ecore_pending_job_threads)
607 LKU(_ecore_pending_job_threads_mutex);
611 work = eina_list_data_get(_ecore_pending_job_threads);
612 _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
613 _ecore_pending_job_threads);
615 LKU(_ecore_pending_job_threads_mutex);
618 work->u.short_run.func_blocking((void *)work->data, (Ecore_Thread *)work);
620 if (work->reschedule)
622 work->reschedule = EINA_FALSE;
624 LKL(_ecore_pending_job_threads_mutex);
625 _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
626 LKU(_ecore_pending_job_threads_mutex);
630 ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
636 _ecore_feedback_job(Ecore_Pipe *end_pipe,
639 Ecore_Pthread_Worker *work;
641 while (_ecore_pending_job_threads_feedback)
643 LKL(_ecore_pending_job_threads_mutex);
645 if (!_ecore_pending_job_threads_feedback)
647 LKU(_ecore_pending_job_threads_mutex);
651 work = eina_list_data_get(_ecore_pending_job_threads_feedback);
652 _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
653 _ecore_pending_job_threads_feedback);
655 LKU(_ecore_pending_job_threads_mutex);
659 work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work);
661 if (work->reschedule)
663 work->reschedule = EINA_FALSE;
665 LKL(_ecore_pending_job_threads_mutex);
666 _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, work);
667 LKU(_ecore_pending_job_threads_mutex);
671 ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
677 _ecore_direct_worker(Ecore_Pthread_Worker *work)
679 Ecore_Pthread_Data *pth;
681 #ifdef EFL_POSIX_THREADS
682 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
683 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
686 eina_sched_prio_drop();
688 pth = malloc(sizeof (Ecore_Pthread_Data));
689 if (!pth) return NULL;
691 pth->p = work->u.feedback_run.direct_pipe;
699 work->self = pth->thread;
700 work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work);
702 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
704 work = work->u.feedback_run.direct_worker;
712 work->u.short_run.func_blocking = NULL;
713 work->func_end = (void *)_ecore_thread_end;
714 work->func_cancel = NULL;
715 work->cancel = EINA_FALSE;
716 work->feedback_run = EINA_FALSE;
717 work->kill = EINA_FALSE;
722 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
728 _ecore_thread_worker(Ecore_Pthread_Data *pth)
730 Ecore_Pthread_Worker *work;
732 #ifdef EFL_POSIX_THREADS
733 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
734 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
737 eina_sched_prio_drop();
740 if (_ecore_pending_job_threads) _ecore_short_job(pth->p);
741 if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
743 /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
745 LKL(_ecore_pending_job_threads_mutex);
746 if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
748 LKU(_ecore_pending_job_threads_mutex);
751 LKU(_ecore_pending_job_threads_mutex);
753 /* Sleep a little to prevent premature death */
755 Sleep(1); /* around 50ms */
760 LKL(_ecore_pending_job_threads_mutex);
761 if (_ecore_pending_job_threads || _ecore_pending_job_threads_feedback)
763 LKU(_ecore_pending_job_threads_mutex);
766 LKU(_ecore_pending_job_threads_mutex);
768 work = pth->death_job;
769 if (!work) return NULL;
772 work->u.short_run.func_blocking = NULL;
773 work->func_end = (void *)_ecore_thread_end;
774 work->func_cancel = NULL;
775 work->cancel = EINA_FALSE;
776 work->feedback_run = EINA_FALSE;
777 work->kill = EINA_FALSE;
782 ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
789 static Ecore_Pthread_Worker *
790 _ecore_thread_worker_new(void)
792 #ifdef EFL_HAVE_THREADS
793 Ecore_Pthread_Worker *result;
795 result = eina_trash_pop(&_ecore_thread_worker_trash);
797 if (!result) result = malloc(sizeof (Ecore_Pthread_Worker));
798 else _ecore_thread_worker_count--;
802 return malloc(sizeof (Ecore_Pthread_Worker));
807 _ecore_thread_init(void)
809 _ecore_thread_count_max = eina_cpu_count();
810 if (_ecore_thread_count_max <= 0)
811 _ecore_thread_count_max = 1;
813 ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
814 _ecore_thread_pipe = eina_array_new(8);
816 #ifdef EFL_HAVE_THREADS
817 del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
819 LKI(_ecore_pending_job_threads_mutex);
820 LRWKI(_ecore_thread_global_hash_lock);
821 LKI(_ecore_thread_global_hash_mutex);
822 CDI(_ecore_thread_global_hash_cond);
827 _ecore_thread_shutdown(void)
829 /* FIXME: If function are still running in the background, should we kill them ? */
831 Eina_Array_Iterator it;
834 #ifdef EFL_HAVE_THREADS
835 Ecore_Pthread_Worker *work;
836 Ecore_Pthread_Data *pth;
838 LKL(_ecore_pending_job_threads_mutex);
840 EINA_LIST_FREE(_ecore_pending_job_threads, work)
842 if (work->func_cancel)
843 work->func_cancel((void *)work->data, (Ecore_Thread *)work);
847 EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
849 if (work->func_cancel)
850 work->func_cancel((void *)work->data, (Ecore_Thread *)work);
854 LKU(_ecore_pending_job_threads_mutex);
856 /* Improve emergency shutdown */
857 EINA_LIST_FREE(_ecore_active_job_threads, pth)
862 PHJ(pth->thread, ep);
864 ecore_pipe_del(pth->p);
866 if (_ecore_thread_global_hash)
867 eina_hash_free(_ecore_thread_global_hash);
868 _ecore_event_handler_del(del_handler);
869 have_main_loop_thread = 0;
872 LKD(_ecore_pending_job_threads_mutex);
873 LRWKD(_ecore_thread_global_hash_lock);
874 LKD(_ecore_thread_global_hash_mutex);
875 CDD(_ecore_thread_global_hash_cond);
878 EINA_ARRAY_ITER_NEXT(_ecore_thread_pipe, i, p, it)
881 eina_array_free(_ecore_thread_pipe);
882 _ecore_thread_pipe = NULL;
886 _ecore_thread_assert_main_loop_thread(const char *function)
889 #ifdef EFL_HAVE_THREADS
890 good = PHE(get_main_loop_thread(), PHS());
896 EINA_LOG_CRIT("Call to %s from wrong thread!", function);
904 ecore_thread_run(Ecore_Thread_Cb func_blocking,
905 Ecore_Thread_Cb func_end,
906 Ecore_Thread_Cb func_cancel,
909 Ecore_Pthread_Worker *work;
910 #ifdef EFL_HAVE_THREADS
911 Ecore_Pthread_Data *pth = NULL;
914 if (!func_blocking) return NULL;
916 work = _ecore_thread_worker_new();
920 func_cancel((void *)data, NULL);
924 work->u.short_run.func_blocking = func_blocking;
925 work->func_end = func_end;
926 work->func_cancel = func_cancel;
927 work->cancel = EINA_FALSE;
928 work->feedback_run = EINA_FALSE;
929 work->kill = EINA_FALSE;
930 work->reschedule = EINA_FALSE;
933 #ifdef EFL_HAVE_THREADS
938 LKL(_ecore_pending_job_threads_mutex);
939 _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
941 if (_ecore_thread_count == _ecore_thread_count_max)
943 LKU(_ecore_pending_job_threads_mutex);
944 return (Ecore_Thread *)work;
947 LKU(_ecore_pending_job_threads_mutex);
949 /* One more thread could be created. */
950 pth = malloc(sizeof (Ecore_Pthread_Data));
951 if (!pth) goto on_error;
953 pth->p = _ecore_thread_pipe_get();
954 pth->death_job = _ecore_thread_worker_new();
955 if (!pth->p || !pth->death_job) goto on_error;
959 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
961 _ecore_thread_count++;
962 return (Ecore_Thread *)work;
965 eina_threads_shutdown();
970 if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
971 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
975 if (_ecore_thread_count == 0)
977 LKL(_ecore_pending_job_threads_mutex);
978 _ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
979 LKU(_ecore_pending_job_threads_mutex);
981 if (work->func_cancel)
982 work->func_cancel((void *)work->data, (Ecore_Thread *)work);
986 return (Ecore_Thread *)work;
989 If no thread and as we don't want to break app that rely on this
990 facility, we will lock the interface until we are done.
993 /* Handle reschedule by forcing it here. That would mean locking the app,
994 * would be better with an idler, but really to complex for a case where
995 * thread should really exist.
997 work->reschedule = EINA_FALSE;
999 func_blocking((void *)data, (Ecore_Thread *)work);
1000 if (work->cancel == EINA_FALSE) func_end((void *)data, (Ecore_Thread *)work);
1001 else func_cancel((void *)data, (Ecore_Thread *)work);
1002 } while (work->reschedule == EINA_TRUE);
1011 ecore_thread_cancel(Ecore_Thread *thread)
1013 #ifdef EFL_HAVE_THREADS
1014 Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
1022 if (work->feedback_run)
1026 if (work->u.feedback_run.send != work->u.feedback_run.received)
1030 LKL(_ecore_pending_job_threads_mutex);
1032 if ((have_main_loop_thread) &&
1033 (PHE(get_main_loop_thread(), PHS())))
1035 if (!work->feedback_run)
1036 EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
1038 if ((void *)work == (void *)thread)
1040 _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
1042 LKU(_ecore_pending_job_threads_mutex);
1044 if (work->func_cancel)
1045 work->func_cancel((void *)work->data, (Ecore_Thread *)work);
1052 EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
1054 if ((void *)work == (void *)thread)
1056 _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
1058 LKU(_ecore_pending_job_threads_mutex);
1060 if (work->func_cancel)
1061 work->func_cancel((void *)work->data, (Ecore_Thread *)work);
1069 LKU(_ecore_pending_job_threads_mutex);
1071 /* Delay the destruction */
1073 ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
1082 ecore_thread_check(Ecore_Thread *thread)
1084 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1086 if (!worker) return EINA_TRUE;
1087 return worker->cancel;
1091 ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy,
1092 Ecore_Thread_Notify_Cb func_notify,
1093 Ecore_Thread_Cb func_end,
1094 Ecore_Thread_Cb func_cancel,
1096 Eina_Bool try_no_queue)
1098 #ifdef EFL_HAVE_THREADS
1099 Ecore_Pthread_Worker *worker;
1100 Ecore_Pthread_Data *pth = NULL;
1102 if (!func_heavy) return NULL;
1104 worker = _ecore_thread_worker_new();
1105 if (!worker) goto on_error;
1107 worker->u.feedback_run.func_heavy = func_heavy;
1108 worker->u.feedback_run.func_notify = func_notify;
1109 worker->hash = NULL;
1112 worker->func_cancel = func_cancel;
1113 worker->func_end = func_end;
1114 worker->data = data;
1115 worker->cancel = EINA_FALSE;
1116 worker->feedback_run = EINA_TRUE;
1117 worker->kill = EINA_FALSE;
1118 worker->reschedule = EINA_FALSE;
1120 worker->u.feedback_run.send = 0;
1121 worker->u.feedback_run.received = 0;
1123 worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
1124 worker->u.feedback_run.direct_pipe = NULL;
1125 worker->u.feedback_run.direct_worker = NULL;
1131 worker->u.feedback_run.direct_pipe = _ecore_thread_pipe_get();
1132 worker->u.feedback_run.direct_worker = _ecore_thread_worker_new();
1133 worker->no_queue = EINA_TRUE;
1135 eina_threads_init();
1137 if (PHC(t, _ecore_direct_worker, worker) == 0)
1138 return (Ecore_Thread *)worker;
1140 eina_threads_shutdown();
1143 worker->no_queue = EINA_FALSE;
1145 LKL(_ecore_pending_job_threads_mutex);
1146 _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, worker);
1148 if (_ecore_thread_count == _ecore_thread_count_max)
1150 LKU(_ecore_pending_job_threads_mutex);
1151 return (Ecore_Thread *)worker;
1154 LKU(_ecore_pending_job_threads_mutex);
1156 /* One more thread could be created. */
1157 pth = malloc(sizeof (Ecore_Pthread_Data));
1158 if (!pth) goto on_error;
1160 pth->p = _ecore_thread_pipe_get();
1161 pth->death_job = _ecore_thread_worker_new();
1162 if (!pth->p || !pth->death_job) goto on_error;
1164 eina_threads_init();
1166 if (PHC(pth->thread, _ecore_thread_worker, pth) == 0)
1168 _ecore_thread_count++;
1169 return (Ecore_Thread *)worker;
1172 eina_threads_shutdown();
1177 if (pth->p) eina_array_push(_ecore_thread_pipe, pth->p);
1178 if (pth->death_job) _ecore_thread_worker_free(pth->death_job);
1182 if (_ecore_thread_count == 0)
1184 LKL(_ecore_pending_job_threads_mutex);
1185 _ecore_pending_job_threads_feedback = eina_list_remove(_ecore_pending_job_threads_feedback,
1187 LKU(_ecore_pending_job_threads_mutex);
1189 if (func_cancel) func_cancel((void *)data, NULL);
1193 ecore_pipe_del(worker->u.feedback_run.notify);
1199 return (Ecore_Thread *)worker;
1201 Ecore_Pthread_Worker worker;
1206 If no thread and as we don't want to break app that rely on this
1207 facility, we will lock the interface until we are done.
1209 worker.u.feedback_run.func_heavy = func_heavy;
1210 worker.u.feedback_run.func_notify = func_notify;
1211 worker.u.feedback_run.notify = NULL;
1212 worker.u.feedback_run.send = 0;
1213 worker.u.feedback_run.received = 0;
1214 worker.func_cancel = func_cancel;
1215 worker.func_end = func_end;
1217 worker.cancel = EINA_FALSE;
1218 worker.feedback_run = EINA_TRUE;
1219 worker.kill = EINA_FALSE;
1222 worker.reschedule = EINA_FALSE;
1224 func_heavy((void *)data, (Ecore_Thread *)&worker);
1226 if (worker.cancel) func_cancel((void *)data, (Ecore_Thread *)&worker);
1227 else func_end((void *)data, (Ecore_Thread *)&worker);
1228 } while (worker.reschedule == EINA_TRUE);
1235 ecore_thread_feedback(Ecore_Thread *thread,
1238 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1240 if (!worker) return EINA_FALSE;
1241 if (!worker->feedback_run) return EINA_FALSE;
1243 #ifdef EFL_HAVE_THREADS
1244 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1246 worker->u.feedback_run.send++;
1247 ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
1251 worker->u.feedback_run.func_notify((void *)worker->data, thread, (void *)data);
1258 ecore_thread_reschedule(Ecore_Thread *thread)
1260 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1262 if (!worker) return EINA_FALSE;
1264 #ifdef EFL_HAVE_THREADS
1265 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1268 worker->reschedule = EINA_TRUE;
1273 ecore_thread_active_get(void)
1275 #ifdef EFL_HAVE_THREADS
1276 return _ecore_thread_count;
1283 ecore_thread_pending_get(void)
1285 #ifdef EFL_HAVE_THREADS
1288 LKL(_ecore_pending_job_threads_mutex);
1289 ret = eina_list_count(_ecore_pending_job_threads);
1290 LKU(_ecore_pending_job_threads_mutex);
1298 ecore_thread_pending_feedback_get(void)
1300 #ifdef EFL_HAVE_THREADS
1303 LKL(_ecore_pending_job_threads_mutex);
1304 ret = eina_list_count(_ecore_pending_job_threads_feedback);
1305 LKU(_ecore_pending_job_threads_mutex);
1313 ecore_thread_pending_total_get(void)
1315 #ifdef EFL_HAVE_THREADS
1318 LKL(_ecore_pending_job_threads_mutex);
1319 ret = eina_list_count(_ecore_pending_job_threads) + eina_list_count(_ecore_pending_job_threads_feedback);
1320 LKU(_ecore_pending_job_threads_mutex);
1328 ecore_thread_max_get(void)
1330 return _ecore_thread_count_max;
1334 ecore_thread_max_set(int num)
1336 if (num < 1) return;
1337 /* avoid doing something hilarious by blocking dumb users */
1338 if (num >= (2 * eina_cpu_count())) return;
1340 _ecore_thread_count_max = num;
1344 ecore_thread_max_reset(void)
1346 _ecore_thread_count_max = eina_cpu_count();
1350 ecore_thread_available_get(void)
1352 #ifdef EFL_HAVE_THREADS
1355 LKL(_ecore_pending_job_threads_mutex);
1356 ret = _ecore_thread_count_max - _ecore_thread_count;
1357 LKU(_ecore_pending_job_threads_mutex);
1365 ecore_thread_local_data_add(Ecore_Thread *thread,
1371 #ifdef EFL_HAVE_THREADS
1372 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1373 Ecore_Thread_Data *d;
1377 if ((!thread) || (!key) || (!value))
1379 #ifdef EFL_HAVE_THREADS
1380 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1383 worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1388 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1395 ret = eina_hash_direct_add(worker->hash, key, d);
1397 ret = eina_hash_add(worker->hash, key, d);
1408 ecore_thread_local_data_set(Ecore_Thread *thread,
1413 #ifdef EFL_HAVE_THREADS
1414 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1415 Ecore_Thread_Data *d, *r;
1419 if ((!thread) || (!key) || (!value))
1421 #ifdef EFL_HAVE_THREADS
1422 if (!PHE(worker->self, PHS())) return NULL;
1425 worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1430 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1436 r = eina_hash_set(worker->hash, key, d);
1448 ecore_thread_local_data_find(Ecore_Thread *thread,
1451 #ifdef EFL_HAVE_THREADS
1452 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1453 Ecore_Thread_Data *d;
1456 if ((!thread) || (!key))
1458 #ifdef EFL_HAVE_THREADS
1459 if (!PHE(worker->self, PHS())) return NULL;
1464 d = eina_hash_find(worker->hash, key);
1474 ecore_thread_local_data_del(Ecore_Thread *thread,
1477 #ifdef EFL_HAVE_THREADS
1478 Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread;
1481 if ((!thread) || (!key))
1483 #ifdef EFL_HAVE_THREADS
1484 if (!PHE(worker->self, PHS())) return EINA_FALSE;
1488 return eina_hash_del_by_key(worker->hash, key);
1495 ecore_thread_global_data_add(const char *key,
1500 #ifdef EFL_HAVE_THREADS
1501 Ecore_Thread_Data *d;
1505 if ((!key) || (!value))
1507 #ifdef EFL_HAVE_THREADS
1508 LRWKWL(_ecore_thread_global_hash_lock);
1509 if (!_ecore_thread_global_hash)
1510 _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1511 LRWKU(_ecore_thread_global_hash_lock);
1513 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1519 if (!_ecore_thread_global_hash)
1521 LRWKWL(_ecore_thread_global_hash_lock);
1523 ret = eina_hash_direct_add(_ecore_thread_global_hash, key, d);
1525 ret = eina_hash_add(_ecore_thread_global_hash, key, d);
1526 LRWKU(_ecore_thread_global_hash_lock);
1527 CDB(_ecore_thread_global_hash_cond);
1537 ecore_thread_global_data_set(const char *key,
1541 #ifdef EFL_HAVE_THREADS
1542 Ecore_Thread_Data *d, *r;
1546 if ((!key) || (!value))
1548 #ifdef EFL_HAVE_THREADS
1549 LRWKWL(_ecore_thread_global_hash_lock);
1550 if (!_ecore_thread_global_hash)
1551 _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1552 LRWKU(_ecore_thread_global_hash_lock);
1554 if (!_ecore_thread_global_hash)
1557 if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1563 LRWKWL(_ecore_thread_global_hash_lock);
1564 r = eina_hash_set(_ecore_thread_global_hash, key, d);
1565 LRWKU(_ecore_thread_global_hash_lock);
1566 CDB(_ecore_thread_global_hash_cond);
1578 ecore_thread_global_data_find(const char *key)
1580 #ifdef EFL_HAVE_THREADS
1581 Ecore_Thread_Data *ret;
1586 #ifdef EFL_HAVE_THREADS
1587 if (!_ecore_thread_global_hash) return NULL;
1589 LRWKRL(_ecore_thread_global_hash_lock);
1590 ret = eina_hash_find(_ecore_thread_global_hash, key);
1591 LRWKU(_ecore_thread_global_hash_lock);
1601 ecore_thread_global_data_del(const char *key)
1603 #ifdef EFL_HAVE_THREADS
1609 #ifdef EFL_HAVE_THREADS
1610 if (!_ecore_thread_global_hash)
1613 LRWKWL(_ecore_thread_global_hash_lock);
1614 ret = eina_hash_del_by_key(_ecore_thread_global_hash, key);
1615 LRWKU(_ecore_thread_global_hash_lock);
1623 ecore_thread_global_data_wait(const char *key,
1626 #ifdef EFL_HAVE_THREADS
1628 Ecore_Thread_Data *ret = NULL;
1633 #ifdef EFL_HAVE_THREADS
1634 if (!_ecore_thread_global_hash)
1637 tm = ecore_time_get() + seconds;
1642 struct timespec t = { 0, 0 };
1644 t.tv_sec = (long int)tm;
1645 t.tv_nsec = (long int)((tm - (double)t.tv_sec) * 1000000000);
1647 struct timeval t = { 0, 0 };
1649 t.tv_sec = (long int)tm;
1650 t.tv_usec = (long int)((tm - (double)t.tv_sec) * 1000000);
1652 LRWKRL(_ecore_thread_global_hash_lock);
1653 ret = eina_hash_find(_ecore_thread_global_hash, key);
1654 LRWKU(_ecore_thread_global_hash_lock);
1655 if ((ret) || (!seconds) || ((seconds > 0) && (tm <= ecore_time_get())))
1657 LKL(_ecore_thread_global_hash_mutex);
1658 CDW(_ecore_thread_global_hash_cond, _ecore_thread_global_hash_mutex, &t);
1659 LKU(_ecore_thread_global_hash_mutex);
1661 if (ret) return ret->data;