[ecore] merged svn latest code (svn54830)
[profile/ivi/ecore.git] / src / lib / ecore / ecore_thread.c
1 #ifdef HAVE_CONFIG_H
2 # include <config.h>
3 #endif
4
5 #ifdef HAVE_EVIL
6 # include <Evil.h>
7 #endif
8
9 #ifdef EFL_HAVE_PTHREAD
10 # include <pthread.h>
11 # ifdef __linux__
12 #  include <sched.h>
13 #  include <sys/time.h>
14 #  include <sys/resource.h>
15 #  include <unistd.h>
16 #  include <sys/syscall.h>
17 #  include <errno.h>
18 # endif
19 #endif
20
21 #include "Ecore.h"
22 #include "ecore_private.h"
23
24 typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker;
25 typedef struct _Ecore_Pthread Ecore_Pthread;
26 typedef struct _Ecore_Thread_Data  Ecore_Thread_Data;
27
28 struct _Ecore_Thread_Data
29 {
30    void *data;
31    Eina_Free_Cb cb;
32 };
33
34 struct _Ecore_Pthread_Worker
35 {
36    union {
37       struct {
38          Ecore_Thread_Heavy_Cb func_blocking;
39       } short_run;
40       struct {
41          Ecore_Thread_Heavy_Cb func_heavy;
42          Ecore_Thread_Notify_Cb func_notify;
43          Ecore_Pipe *notify;
44
45          int send;
46          int received;
47       } feedback_run;
48    } u;
49
50    Ecore_Cb func_cancel;
51    Ecore_Cb func_end;
52 #ifdef EFL_HAVE_PTHREAD
53    pthread_t self;
54    Eina_Hash *hash;
55    pthread_cond_t cond;
56    pthread_mutex_t mutex;
57 #endif
58
59    const void *data;
60
61    Eina_Bool cancel : 1;
62    Eina_Bool feedback_run : 1;
63    Eina_Bool kill : 1;
64 };
65
66 #ifdef EFL_HAVE_PTHREAD
67 typedef struct _Ecore_Pthread_Data Ecore_Pthread_Data;
68
69 struct _Ecore_Pthread_Data
70 {
71    Ecore_Pipe *p;
72    void *data;
73    pthread_t thread;
74 };
75 #endif
76
77 static int _ecore_thread_count_max = 0;
78 static int ECORE_THREAD_PIPE_DEL = 0;
79
80 #ifdef EFL_HAVE_PTHREAD
81 static int _ecore_thread_count = 0;
82
83 static Eina_List *_ecore_active_job_threads = NULL;
84 static Eina_List *_ecore_pending_job_threads = NULL;
85 static Eina_List *_ecore_pending_job_threads_feedback = NULL;
86 static Ecore_Event_Handler *del_handler = NULL;
87 static pthread_mutex_t _ecore_pending_job_threads_mutex = PTHREAD_MUTEX_INITIALIZER;
88
89 static Eina_Hash *_ecore_thread_global_hash = NULL;
90 static pthread_rwlock_t _ecore_thread_global_hash_lock = PTHREAD_RWLOCK_INITIALIZER;
91 static pthread_mutex_t _ecore_thread_global_hash_mutex = PTHREAD_MUTEX_INITIALIZER;
92 static pthread_cond_t _ecore_thread_global_hash_cond = PTHREAD_COND_INITIALIZER;
93 static pthread_t main_loop_thread;
94 static Eina_Bool have_main_loop_thread = 0;
95
96 static void
97 _ecore_thread_data_free(void *data)
98 {
99    Ecore_Thread_Data *d = data;
100
101    if (d->cb) d->cb(d->data);
102    free(d);
103 }
104
105 static void
106 _ecore_thread_pipe_free(void *data __UNUSED__, void *event)
107 {
108    Ecore_Pipe *p = event;
109
110    ecore_pipe_del(p);
111    eina_threads_shutdown();
112 }
113
114 static Eina_Bool
115 _ecore_thread_pipe_del(void *data __UNUSED__, int type __UNUSED__, void *event __UNUSED__)
116 {
117    /* This is a hack to delay pipe destruction until we are out of its internal loop. */
118    return ECORE_CALLBACK_CANCEL;
119 }
120
121 static void
122 _ecore_thread_end(Ecore_Pthread_Data *pth)
123 {
124    Ecore_Pipe *p;
125
126    if (pthread_join(pth->thread, (void **) &p) != 0)
127      return ;
128
129    _ecore_active_job_threads = eina_list_remove(_ecore_active_job_threads, pth);
130
131    ecore_event_add(ECORE_THREAD_PIPE_DEL, pth->p, _ecore_thread_pipe_free, NULL);
132    free(pth);
133 }
134
135 static void
136 _ecore_thread_kill(Ecore_Pthread_Worker *work)
137 {
138    if (work->cancel)
139      {
140         if (work->func_cancel)
141           work->func_cancel((void *) work->data);
142      }
143    else
144      {
145         if (work->func_end)
146           work->func_end((void *) work->data);
147      }
148
149    if (work->feedback_run)
150      ecore_pipe_del(work->u.feedback_run.notify);
151    pthread_cond_destroy(&work->cond);
152    pthread_mutex_destroy(&work->mutex);
153    if (work->hash)
154      eina_hash_free(work->hash);
155    free(work);
156 }
157
158 static void
159 _ecore_thread_handler(void *data __UNUSED__, void *buffer, unsigned int nbyte)
160 {
161    Ecore_Pthread_Worker *work;
162
163    if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
164
165    work = *(Ecore_Pthread_Worker **)buffer;
166
167    if (work->feedback_run)
168      {
169         if (work->u.feedback_run.send != work->u.feedback_run.received)
170           {
171              work->kill = EINA_TRUE;
172              return ;
173           }
174      }
175
176    _ecore_thread_kill(work);
177 }
178
179 static void
180 _ecore_notify_handler(void *data, void *buffer, unsigned int nbyte)
181 {
182    Ecore_Pthread_Worker *work = data;
183    void *user_data;
184
185    if (nbyte != sizeof (Ecore_Pthread_Worker *)) return ;
186
187    user_data = *(void **)buffer;
188    work->u.feedback_run.received++;
189
190    if (work->u.feedback_run.func_notify)
191      work->u.feedback_run.func_notify((Ecore_Thread *) work, user_data, (void *) work->data);
192
193    /* Force reading all notify event before killing the thread */
194    if (work->kill && work->u.feedback_run.send == work->u.feedback_run.received)
195      {
196         _ecore_thread_kill(work);
197      }
198 }
199
200 static void
201 _ecore_short_job(Ecore_Pipe *end_pipe)
202 {
203    Ecore_Pthread_Worker *work;
204
205    while (_ecore_pending_job_threads)
206      {
207         pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
208
209         if (!_ecore_pending_job_threads)
210           {
211              pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
212              break;
213           }
214
215         work = eina_list_data_get(_ecore_pending_job_threads);
216         _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads,
217                                                            _ecore_pending_job_threads);
218
219         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
220
221         if (!work->cancel)
222           work->u.short_run.func_blocking((Ecore_Thread*) work, (void *) work->data);
223
224         ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
225      }
226 }
227
228 static void
229 _ecore_feedback_job(Ecore_Pipe *end_pipe, pthread_t thread)
230 {
231    Ecore_Pthread_Worker *work;
232
233    while (_ecore_pending_job_threads_feedback)
234      {
235         pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
236
237         if (!_ecore_pending_job_threads_feedback)
238           {
239              pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
240              break;
241           }
242
243         work = eina_list_data_get(_ecore_pending_job_threads_feedback);
244         _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback,
245                                                                     _ecore_pending_job_threads_feedback);
246
247         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
248
249         work->self = thread;
250         if (!work->cancel)
251           work->u.feedback_run.func_heavy((Ecore_Thread *) work, (void *) work->data);
252
253         ecore_pipe_write(end_pipe, &work, sizeof (Ecore_Pthread_Worker *));
254      }
255 }
256
257 static void *
258 _ecore_direct_worker(Ecore_Pthread_Worker *work)
259 {
260    Ecore_Pthread_Data *pth;
261
262    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
263    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
264    eina_sched_prio_drop();
265
266    pth = malloc(sizeof (Ecore_Pthread_Data));
267    if (!pth) return NULL;
268
269    pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
270    if (!pth->p)
271      {
272         free(pth);
273         return NULL;
274      }
275    pth->thread = pthread_self();
276
277    work->self = pth->thread;
278    work->u.feedback_run.func_heavy((Ecore_Thread *) work, (void *) work->data);
279
280    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
281
282    work = malloc(sizeof (Ecore_Pthread_Worker));
283    if (!work)
284      {
285         ecore_pipe_del(pth->p);
286         free(pth);
287         return NULL;
288      }
289
290    work->data = pth;
291    work->u.short_run.func_blocking = NULL;
292    work->func_end = (void *) _ecore_thread_end;
293    work->func_cancel = NULL;
294    work->cancel = EINA_FALSE;
295    work->feedback_run = EINA_FALSE;
296    work->kill = EINA_FALSE;
297    work->hash = NULL;
298    pthread_cond_init(&work->cond, NULL);
299    pthread_mutex_init(&work->mutex, NULL);
300
301    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
302
303    return pth->p;
304 }
305
306 static void *
307 _ecore_thread_worker(Ecore_Pthread_Data *pth)
308 {
309    Ecore_Pthread_Worker *work;
310
311    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
312    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
313    eina_sched_prio_drop();
314
315    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
316    _ecore_thread_count++;
317    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
318
319  restart:
320    if (_ecore_pending_job_threads) _ecore_short_job(pth->p);
321    if (_ecore_pending_job_threads_feedback) _ecore_feedback_job(pth->p, pth->thread);
322
323    /* FIXME: Check if there is feedback running task todo, and switch to feedback run handler. */
324
325    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
326    if (_ecore_pending_job_threads)
327      {
328         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
329         goto restart;
330      }
331    if (_ecore_pending_job_threads_feedback)
332      {
333         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
334         goto restart;
335      }
336    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
337
338    /* Sleep a little to prevent premature death */
339    usleep(200);
340
341    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
342    if (_ecore_pending_job_threads)
343      {
344         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
345         goto restart;
346      }
347    if (_ecore_pending_job_threads_feedback)
348      {
349         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
350         goto restart;
351      }
352    _ecore_thread_count--;
353    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
354
355    work = malloc(sizeof (Ecore_Pthread_Worker));
356    if (!work) return NULL;
357
358    work->data = pth;
359    work->u.short_run.func_blocking = NULL;
360    work->func_end = (void *) _ecore_thread_end;
361    work->func_cancel = NULL;
362    work->cancel = EINA_FALSE;
363    work->feedback_run = EINA_FALSE;
364    work->kill = EINA_FALSE;
365    work->hash = NULL;
366    pthread_cond_init(&work->cond, NULL);
367    pthread_mutex_init(&work->mutex, NULL);
368
369    ecore_pipe_write(pth->p, &work, sizeof (Ecore_Pthread_Worker *));
370
371    return pth->p;
372 }
373
374 #endif
375
376 void
377 _ecore_thread_init(void)
378 {
379    _ecore_thread_count_max = eina_cpu_count();
380    if (_ecore_thread_count_max <= 0)
381      _ecore_thread_count_max = 1;
382
383    ECORE_THREAD_PIPE_DEL = ecore_event_type_new();
384 #ifdef EFL_HAVE_PTHREAD
385    del_handler = ecore_event_handler_add(ECORE_THREAD_PIPE_DEL, _ecore_thread_pipe_del, NULL);
386    main_loop_thread = pthread_self();
387    have_main_loop_thread = 1;
388 #endif
389 }
390
391 void
392 _ecore_thread_shutdown(void)
393 {
394    /* FIXME: If function are still running in the background, should we kill them ? */
395 #ifdef EFL_HAVE_PTHREAD
396    Ecore_Pthread_Worker *work;
397    Ecore_Pthread_Data *pth;
398
399    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
400
401    EINA_LIST_FREE(_ecore_pending_job_threads, work)
402      {
403         if (work->func_cancel)
404           work->func_cancel((void *)work->data);
405         free(work);
406      }
407
408    EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work)
409      {
410         if (work->func_cancel)
411           work->func_cancel((void *)work->data);
412         free(work);
413      }
414
415    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
416
417    EINA_LIST_FREE(_ecore_active_job_threads, pth)
418      {
419         Ecore_Pipe *p;
420
421         pthread_cancel(pth->thread);
422         pthread_join(pth->thread, (void **) &p);
423
424         ecore_pipe_del(pth->p);
425      }
426    if (_ecore_thread_global_hash)
427      eina_hash_free(_ecore_thread_global_hash);
428    ecore_event_handler_del(del_handler);
429    have_main_loop_thread = 0;
430    del_handler = NULL;
431 #endif
432 }
433
434 /**
435  * @addtogroup Ecore_Group Ecore - Main Loop and Job Functions.
436  *
437  * @{
438  */
439
440 /**
441  * @addtogroup Ecore_Thread_Group Ecore Thread functions
442  *
443  * These functions allow for ecore-managed threads which integrate with ecore's main loop.
444  *
445  * @{
446  */
447
448 /**
449  * @brief Run some blocking code in a parallel thread to avoid locking the main loop.
450  * @param func_blocking The function that should run in another thread.
451  * @param func_end The function that will be called in the main loop if the thread terminate correctly.
452  * @param func_cancel The function that will be called in the main loop if the thread is cancelled.
453  * @param data User context data to pass to all callback.
454  * @return A reference to the newly created thread instance, or NULL if it failed.
455  *
456  * ecore_thread_run provide a facility for easily managing blocking task in a
457  * parallel thread. You should provide three function. The first one, func_blocking,
458  * that will do the blocking work in another thread (so you should not use the
459  * EFL in it except Eina if you are careful). The second one, func_end,
460  * that will be called in Ecore main loop when func_blocking is done. So you
461  * can use all the EFL inside this function. The last one, func_cancel, will
462  * be called in the main loop if the thread is cancelled or could not run at all.
463  *
464  * Be aware, that you can't make assumption on the result order of func_end
465  * after many call to ecore_thread_run, as we start as much thread as the
466  * host CPU can handle.
467  */
468 EAPI Ecore_Thread *
469 ecore_thread_run(Ecore_Thread_Heavy_Cb func_blocking,
470                  Ecore_Cb func_end,
471                  Ecore_Cb func_cancel,
472                  const void *data)
473 {
474    Ecore_Pthread_Worker *work;
475 #ifdef EFL_HAVE_PTHREAD
476    Ecore_Pthread_Data *pth = NULL;
477 #endif
478
479    if (!func_blocking) return NULL;
480
481    work = malloc(sizeof (Ecore_Pthread_Worker));
482    if (!work)
483      {
484         if (func_cancel)
485           func_cancel((void *) data);
486         return NULL;
487      }
488
489    work->u.short_run.func_blocking = func_blocking;
490    work->func_end = func_end;
491    work->func_cancel = func_cancel;
492    work->cancel = EINA_FALSE;
493    work->feedback_run = EINA_FALSE;
494    work->kill = EINA_FALSE;
495    work->data = data;
496
497 #ifdef EFL_HAVE_PTHREAD
498    work->hash = NULL;
499    pthread_cond_init(&work->cond, NULL);
500    pthread_mutex_init(&work->mutex, NULL);
501
502    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
503    _ecore_pending_job_threads = eina_list_append(_ecore_pending_job_threads, work);
504
505    if (_ecore_thread_count == _ecore_thread_count_max)
506      {
507         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
508         return (Ecore_Thread *) work;
509      }
510
511    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
512
513    /* One more thread could be created. */
514    pth = malloc(sizeof (Ecore_Pthread_Data));
515    if (!pth) goto on_error;
516
517    pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
518    if (!pth->p) goto on_error;
519
520    eina_threads_init();
521
522    if (pthread_create(&pth->thread, NULL, (void *) _ecore_thread_worker, pth) == 0)
523       return (Ecore_Thread *) work;
524
525    eina_threads_shutdown();
526
527  on_error:
528    if (pth)
529      {
530         if (pth->p) ecore_pipe_del(pth->p);
531         pth->p = NULL;
532         free(pth);
533      }
534
535    if (_ecore_thread_count == 0)
536      {
537         pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
538         _ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work);
539         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
540
541         if (work->func_cancel)
542           work->func_cancel((void *) work->data);
543         free(work);
544         work = NULL;
545      }
546    return (Ecore_Thread *) work;
547 #else
548    /*
549      If no thread and as we don't want to break app that rely on this
550      facility, we will lock the interface until we are done.
551     */
552    func_blocking((Ecore_Thread *) work, (void *)data);
553    func_end((void *)data);
554
555    return NULL;
556 #endif
557 }
558
559 /**
560  * @brief Cancel a running thread.
561  * @param thread The thread to cancel.
562  * @return Will return EINA_TRUE if the thread has been cancelled,
563  *         EINA_FALSE if it is pending.
564  *
565  * ecore_thread_cancel give the possibility to cancel a task still running. It
566  * will return EINA_FALSE, if the destruction is delayed or EINA_TRUE if it is
567  * cancelled after this call.
568  *
569  * This function work in the main loop and in the thread, but you should not pass
570  * the Ecore_Thread variable from main loop to the worker thread in any structure.
571  * You should always use the one passed to the Ecore_Thread_Heavy_Cb.
572  *
573  * func_end, func_cancel will destroy the handler, so don't use it after.
574  * And if ecore_thread_cancel return EINA_TRUE, you should not use Ecore_Thread also.
575  */
576 EAPI Eina_Bool
577 ecore_thread_cancel(Ecore_Thread *thread)
578 {
579 #ifdef EFL_HAVE_PTHREAD
580    Ecore_Pthread_Worker *work = (Ecore_Pthread_Worker *)thread;
581    Eina_List *l;
582
583    if (!work)
584      return EINA_TRUE;
585    if (work->cancel)
586      return EINA_FALSE;
587
588    if (work->feedback_run)
589      {
590         if (work->kill)
591           return EINA_TRUE;
592         if (work->u.feedback_run.send != work->u.feedback_run.received)
593           goto on_exit;
594      }
595
596    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
597
598    if ((have_main_loop_thread) &&
599        (pthread_equal(main_loop_thread, pthread_self())))
600      {
601         if (!work->feedback_run)
602           EINA_LIST_FOREACH(_ecore_pending_job_threads, l, work)
603             {
604                if ((void *) work == (void *) thread)
605                  {
606                     _ecore_pending_job_threads = eina_list_remove_list(_ecore_pending_job_threads, l);
607
608                     pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
609
610                     if (work->func_cancel)
611                       work->func_cancel((void *) work->data);
612                     free(work);
613
614                     return EINA_TRUE;
615                  }
616             }
617         else
618           EINA_LIST_FOREACH(_ecore_pending_job_threads_feedback, l, work)
619             {
620                if ((void *) work == (void *) thread)
621                  {
622                     _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, l);
623
624                     pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
625
626                     if (work->func_cancel)
627                       work->func_cancel((void *) work->data);
628                     free(work);
629
630                     return EINA_TRUE;
631                  }
632             }
633      }
634
635    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
636
637    /* Delay the destruction */
638  on_exit:
639    ((Ecore_Pthread_Worker *)thread)->cancel = EINA_TRUE;
640    return EINA_FALSE;
641 #else
642    return EINA_TRUE;
643 #endif
644 }
645
646 /**
647  * @brief Tell if a thread was canceled or not.
648  * @param thread The thread to test.
649  * @return EINA_TRUE if the thread is cancelled,
650  *         EINA_FALSE if it is not.
651  *
652  * You can use this function in main loop and in the thread.
653  */
654 EAPI Eina_Bool
655 ecore_thread_check(Ecore_Thread *thread)
656 {
657    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
658
659    if (!worker) return EINA_TRUE;
660    return worker->cancel;
661 }
662
663 /**
664  * @brief Run some heavy code in a parallel thread to avoid locking the main loop.
665  * @param func_heavy The function that should run in another thread.
666  * @param func_notify The function that will receive the data send by func_heavy in the main loop.
667  * @param func_end The function that will be called in the main loop if the thread terminate correctly.
668  * @param func_cancel The function that will be called in the main loop if the thread is cancelled.
669  * @param data User context data to pass to all callback.
670  * @param try_no_queue If you wan't to run outside of the thread pool.
671  * @return A reference to the newly created thread instance, or NULL if it failed.
672  *
673  * ecore_thread_feedback_run provide a facility for easily managing heavy task in a
674  * parallel thread. You should provide four functions. The first one, func_heavy,
675  * that will do the heavy work in another thread (so you should not use the
676  * EFL in it except Eina and Eet if you are careful). The second one, func_notify,
677  * will receive the data send from the thread function (func_heavy) by ecore_thread_notify
678  * in the main loop (and so, can use all the EFL). Tje third, func_end,
679  * that will be called in Ecore main loop when func_heavy is done. So you
680  * can use all the EFL inside this function. The last one, func_cancel, will
681  * be called in the main loop also, if the thread is cancelled or could not run at all.
682  *
683  * Be aware, that you can't make assumption on the result order of func_end
684  * after many call to ecore_feedback_run, as we start as much thread as the
685  * host CPU can handle.
686  *
687  * If you set try_no_queue, it will try to run outside of the thread pool, this can bring
688  * the CPU down, so be careful with that. Of course if it can't start a new thread, it will
689  * try to use one from the pool.
690  */
691 EAPI Ecore_Thread *ecore_thread_feedback_run(Ecore_Thread_Heavy_Cb func_heavy,
692                                              Ecore_Thread_Notify_Cb func_notify,
693                                              Ecore_Cb func_end,
694                                              Ecore_Cb func_cancel,
695                                              const void *data,
696                                              Eina_Bool try_no_queue)
697 {
698
699 #ifdef EFL_HAVE_PTHREAD
700    Ecore_Pthread_Worker *worker;
701    Ecore_Pthread_Data *pth = NULL;
702
703    if (!func_heavy) return NULL;
704
705    worker = malloc(sizeof (Ecore_Pthread_Worker));
706    if (!worker) goto on_error;
707
708    worker->u.feedback_run.func_heavy = func_heavy;
709    worker->u.feedback_run.func_notify = func_notify;
710    worker->hash = NULL;
711    pthread_cond_init(&worker->cond, NULL);
712    pthread_mutex_init(&worker->mutex, NULL);
713    worker->func_cancel = func_cancel;
714    worker->func_end = func_end;
715    worker->data = data;
716    worker->cancel = EINA_FALSE;
717    worker->feedback_run = EINA_TRUE;
718    worker->kill = EINA_FALSE;
719    worker->u.feedback_run.send = 0;
720    worker->u.feedback_run.received = 0;
721
722    worker->u.feedback_run.notify = ecore_pipe_add(_ecore_notify_handler, worker);
723
724    if (!try_no_queue)
725      {
726         pthread_t t;
727
728         if (pthread_create(&t, NULL, (void *) _ecore_direct_worker, worker) == 0)
729            return (Ecore_Thread *) worker;
730      }
731
732    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
733    _ecore_pending_job_threads_feedback = eina_list_append(_ecore_pending_job_threads_feedback, worker);
734
735    if (_ecore_thread_count == _ecore_thread_count_max)
736      {
737         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
738         return (Ecore_Thread *) worker;
739      }
740
741    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
742
743    /* One more thread could be created. */
744    pth = malloc(sizeof (Ecore_Pthread_Data));
745    if (!pth) goto on_error;
746
747    pth->p = ecore_pipe_add(_ecore_thread_handler, NULL);
748    if (!pth->p) goto on_error;
749
750    eina_threads_init();
751
752    if (pthread_create(&pth->thread, NULL, (void *) _ecore_thread_worker, pth) == 0)
753       return (Ecore_Thread *) worker;
754
755    eina_threads_shutdown();
756
757  on_error:
758    if (pth)
759      {
760         if (pth->p) ecore_pipe_del(pth->p);
761         free(pth);
762      }
763
764    if (_ecore_thread_count == 0)
765      {
766         pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
767         _ecore_pending_job_threads_feedback = eina_list_remove(_ecore_pending_job_threads_feedback,
768                                                                worker);
769         pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
770
771         if (func_cancel) func_cancel((void *) data);
772
773         if (worker)
774           {
775              ecore_pipe_del(worker->u.feedback_run.notify);
776              free(worker);
777              worker = NULL;
778           }
779      }
780
781    return (Ecore_Thread *) worker;
782 #else
783    Ecore_Pthread_Worker worker;
784
785    (void) try_no_queue;
786
787    /*
788      If no thread and as we don't want to break app that rely on this
789      facility, we will lock the interface until we are done.
790     */
791    worker.u.feedback_run.func_heavy = func_heavy;
792    worker.u.feedback_run.func_notify = func_notify;
793    worker.u.feedback_run.notify = NULL;
794    worker.u.feedback_run.send = 0;
795    worker.u.feedback_run.received = 0;
796    worker.func_cancel = func_cancel;
797    worker.func_end = func_end;
798    worker.data = data;
799    worker.cancel = EINA_FALSE;
800    worker.feedback_run = EINA_TRUE;
801    worker.kill = EINA_FALSE;
802
803    func_heavy((Ecore_Thread *) &worker, (void *)data);
804
805    if (worker.func_cancel) func_cancel((void *)data);
806    else func_end((void *)data);
807
808    return NULL;
809 #endif
810 }
811
812 /**
813  * @brief Send data to main loop from worker thread.
814  * @param thread The current Ecore_Thread context to send data from
815  * @param data Data to be transmitted to the main loop
816  * @return EINA_TRUE if data was successfully send to main loop,
817  *         EINA_FALSE if anything goes wrong.
818  *
819  * After a succesfull call, the data should be considered owned
820  * by the main loop.
821  *
822  * You should use this function only in the func_heavy call.
823  */
824 EAPI Eina_Bool
825 ecore_thread_feedback(Ecore_Thread *thread, const void *data)
826 {
827    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
828
829    if (!worker) return EINA_FALSE;
830    if (!worker->feedback_run) return EINA_FALSE;
831
832 #ifdef EFL_HAVE_PTHREAD
833    if (!pthread_equal(worker->self, pthread_self())) return EINA_FALSE;
834
835    worker->u.feedback_run.send++;
836    ecore_pipe_write(worker->u.feedback_run.notify, &data, sizeof (void *));
837
838    return EINA_TRUE;
839 #else
840    worker->u.feedback_run.func_notify(thread, (void*) data, (void*) worker->data);
841
842    return EINA_TRUE;
843 #endif
844 }
845
846 /**
847  * @brief Get number of active thread jobs
848  * @return Number of active threads running jobs
849  * This returns the number of threads currently running jobs through the
850  * ecore_thread api.
851  */
852 EAPI int
853 ecore_thread_active_get(void)
854 {
855 #ifdef EFL_HAVE_PTHREAD
856    return _ecore_thread_count;
857 #else
858    return 0;
859 #endif
860 }
861
862 /**
863  * @brief Get number of pending (short) thread jobs
864  * @return Number of pending threads running "short" jobs
865  * This returns the number of threads currently running jobs through the
866  * ecore_thread_run api call.
867  */
868 EAPI int
869 ecore_thread_pending_get(void)
870 {
871    int ret;
872 #ifdef EFL_HAVE_PTHREAD
873    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
874    ret = eina_list_count(_ecore_pending_job_threads);
875    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
876    return ret;
877 #else
878    return 0;
879 #endif
880 }
881
882 /**
883  * @brief Get number of pending feedback thread jobs
884  * @return Number of pending threads running "feedback" jobs
885  * This returns the number of threads currently running jobs through the
886  * ecore_thread_feedback_run api call.
887  */
888 EAPI int
889 ecore_thread_pending_feedback_get(void)
890 {
891    int ret;
892 #ifdef EFL_HAVE_PTHREAD
893    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
894    ret = eina_list_count(_ecore_pending_job_threads_feedback);
895    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
896    return ret;
897 #else
898    return 0;
899 #endif
900 }
901
902 /**
903  * @brief Get number of pending thread jobs
904  * @return Number of pending threads running jobs
905  * This returns the number of threads currently running jobs through the
906  * ecore_thread_run and ecore_thread_feedback_run api calls combined.
907  */
908 EAPI int
909 ecore_thread_pending_total_get(void)
910 {
911    int ret;
912 #ifdef EFL_HAVE_PTHREAD
913    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
914    ret = eina_list_count(_ecore_pending_job_threads) + eina_list_count(_ecore_pending_job_threads_feedback);
915    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
916    return ret;
917 #else
918    return 0;
919 #endif
920 }
921
922 /**
923  * @brief Get the max number of threads that can run simultaneously
924  * @return Max number of threads ecore will run
925  * This returns the total number of threads that ecore will attempt to run
926  * simultaneously.
927  */
928 EAPI int
929 ecore_thread_max_get(void)
930 {
931    return _ecore_thread_count_max;
932 }
933
934 /**
935  * @brief Set the max number of threads that can run simultaneously
936  * @param num The new maximum
937  * This sets the maximum number of threads that ecore will try to run
938  * simultaneously.  This number cannot be < 1 or >= 2x the number of active cpus.
939  */
940 EAPI void
941 ecore_thread_max_set(int num)
942 {
943    if (num < 1) return;
944    /* avoid doing something hilarious by blocking dumb users */
945    if (num >= (2 * eina_cpu_count())) return;
946
947    _ecore_thread_count_max = num;
948 }
949
950 /**
951  * @brief Reset the max number of threads that can run simultaneously
952  * This resets the maximum number of threads that ecore will try to run
953  * simultaneously to the number of active cpus.
954  */
955 EAPI void
956 ecore_thread_max_reset(void)
957 {
958    _ecore_thread_count_max = eina_cpu_count();
959 }
960
961 /**
962  * @brief Get the number of threads which are available to be used
963  * @return The number of available threads
964  * This returns the number of threads slots that ecore has currently available.
965  * Assuming that you haven't changed the max number of threads with @ref ecore_thread_max_set
966  * this should be equal to (num_cpus - (active_running + active_feedback_running))
967  */
968 EAPI int
969 ecore_thread_available_get(void)
970 {
971    int ret;
972 #ifdef EFL_HAVE_PTHREAD
973    pthread_mutex_lock(&_ecore_pending_job_threads_mutex);
974    ret = _ecore_thread_count_max - _ecore_thread_count;
975    pthread_mutex_unlock(&_ecore_pending_job_threads_mutex);
976    return ret;
977 #else
978    return 0;
979 #endif
980 }
981
982 /**
983  * @brief Add data to the thread for subsequent use
984  * @param thread The thread context to add to
985  * @param key The name string to add the data with
986  * @param value The data to add
987  * @param cb The callback to free the data with
988  * @param direct If true, this will not copy the key string (like eina_hash_direct_add)
989  * @return EINA_TRUE on success, EINA_FALSE on failure
990  * This adds data to the thread context, allowing the thread
991  * to retrieve and use it without complicated mutexing.  This function can only be called by a
992  * *_run thread INSIDE the thread and will return EINA_FALSE in any case but success.
993  * All data added to the thread will be freed with its associated callback (if present)
994  * upon thread termination.  If no callback is specified, it is expected that the user will free the
995  * data, but this is most likely not what you want.
996  */
997 EAPI Eina_Bool
998 ecore_thread_local_data_add(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
999 {
1000    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1001    Ecore_Thread_Data *d;
1002    Eina_Bool ret;
1003
1004    if ((!thread) || (!key) || (!value))
1005      return EINA_FALSE;
1006 #ifdef EFL_HAVE_PTHREAD
1007    if (!pthread_equal(worker->self, pthread_self())) return EINA_FALSE;
1008
1009    if (!worker->hash)
1010      worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1011
1012    if (!worker->hash)
1013      return EINA_FALSE;
1014
1015    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1016      return EINA_FALSE;
1017
1018    d->data = value;
1019    d->cb = cb;
1020
1021    if (direct)
1022      ret = eina_hash_direct_add(worker->hash, key, d);
1023    else
1024      ret = eina_hash_add(worker->hash, key, d);
1025    pthread_cond_broadcast(&worker->cond);
1026    return ret;
1027 #else
1028    return EINA_TRUE;
1029 #endif
1030 }
1031
1032 /**
1033  * @brief Modify data in the thread, or add if not found
1034  * @param thread The thread context
1035  * @param key The name string to add the data with
1036  * @param value The data to add
1037  * @param cb The callback to free the data with
1038  * @return The old data associated with @p key on success if modified, NULL if added
1039  * This adds/modifies data in the thread context, adding only if modify fails.
1040  * This function can only be called by a *_run thread INSIDE the thread.
1041  * All data added to the thread pool will be freed with its associated callback (if present)
1042  * upon thread termination.  If no callback is specified, it is expected that the user will free the
1043  * data, but this is most likely not what you want.
1044  */
1045 EAPI void *
1046 ecore_thread_local_data_set(Ecore_Thread *thread, const char *key, void *value, Eina_Free_Cb cb)
1047 {
1048    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1049    Ecore_Thread_Data *d, *r;
1050    void *ret;
1051    if ((!thread) || (!key) || (!value))
1052      return NULL;
1053 #ifdef EFL_HAVE_PTHREAD
1054    if (!pthread_equal(worker->self, pthread_self())) return NULL;
1055
1056    if (!worker->hash)
1057      worker->hash = eina_hash_string_small_new(_ecore_thread_data_free);
1058
1059    if (!worker->hash)
1060      return NULL;
1061
1062    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1063      return NULL;
1064
1065    d->data = value;
1066    d->cb = cb;
1067
1068    r = eina_hash_set(worker->hash, key, d);
1069    pthread_cond_broadcast(&worker->cond);
1070    ret = r->data;
1071    free(r);
1072    return ret;
1073 #else
1074    return NULL;
1075 #endif
1076 }
1077
1078 /**
1079  * @brief Find data in the thread's data
1080  * @param thread The thread context
1081  * @param key The name string the data is associated with
1082  * @return The value, or NULL on error
1083  * This finds data in the thread context that has been previously added with @ref ecore_thread_local_data_add
1084  * This function can only be called by a *_run thread INSIDE the thread, and will return NULL
1085  * in any case but success.
1086  */
1087
1088 EAPI void *
1089 ecore_thread_local_data_find(Ecore_Thread *thread, const char *key)
1090 {
1091    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1092    Ecore_Thread_Data *d;
1093
1094    if ((!thread) || (!key))
1095      return NULL;
1096 #ifdef EFL_HAVE_PTHREAD
1097    if (!pthread_equal(worker->self, pthread_self())) return NULL;
1098
1099    if (!worker->hash)
1100      return NULL;
1101
1102    d = eina_hash_find(worker->hash, key);
1103    return d->data;
1104 #else
1105    return NULL;
1106 #endif
1107 }
1108
1109 /**
1110  * @brief Delete data from the thread's data
1111  * @param thread The thread context
1112  * @param key The name string the data is associated with
1113  * @return EINA_TRUE on success, EINA_FALSE on failure
1114  * This deletes the data pointer from the thread context which was previously added with @ref ecore_thread_local_data_add
1115  * This function can only be called by a *_run thread INSIDE the thread, and will return EINA_FALSE
1116  * in any case but success.  Note that this WILL free the data if a callback was specified.
1117  */
1118 EAPI Eina_Bool
1119 ecore_thread_local_data_del(Ecore_Thread *thread, const char *key)
1120 {
1121    Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *) thread;
1122    Ecore_Thread_Data *d;
1123    if ((!thread) || (!key))
1124      return EINA_FALSE;
1125 #ifdef EFL_HAVE_PTHREAD
1126    if (!pthread_equal(worker->self, pthread_self())) return EINA_FALSE;
1127
1128    if (!worker->hash)
1129      return EINA_FALSE;
1130    if ((d = eina_hash_find(worker->hash, key)))
1131      _ecore_thread_data_free(d);
1132    return eina_hash_del_by_key(worker->hash, key);
1133 #else
1134    return EINA_TRUE;
1135 #endif
1136 }
1137
1138 /**
1139  * @brief Add data to the global data
1140  * @param key The name string to add the data with
1141  * @param value The data to add
1142  * @param cb The optional callback to free the data with once ecore is shut down
1143  * @param direct If true, this will not copy the key string (like eina_hash_direct_add)
1144  * @return EINA_TRUE on success, EINA_FALSE on failure
1145  * This adds data to the global thread data, and will return EINA_FALSE in any case but success.
1146  * All data added to global can be manually freed, or a callback can be provided with @p cb which will
1147  * be called upon ecore_thread shutting down.  Note that if you have manually freed data that a callback
1148  * was specified for, you will most likely encounter a segv later on.
1149  */
1150 EAPI Eina_Bool
1151 ecore_thread_global_data_add(const char *key, void *value, Eina_Free_Cb cb, Eina_Bool direct)
1152 {
1153    Eina_Bool ret;
1154    Ecore_Thread_Data *d;
1155
1156    if ((!key) || (!value))
1157      return EINA_FALSE;
1158 #ifdef EFL_HAVE_PTHREAD
1159    pthread_rwlock_wrlock(&_ecore_thread_global_hash_lock);
1160    if (!_ecore_thread_global_hash)
1161      _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1162    pthread_rwlock_unlock(&_ecore_thread_global_hash_lock);
1163
1164    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1165      return EINA_FALSE;
1166
1167    d->data = value;
1168    d->cb = cb;
1169
1170    if (!_ecore_thread_global_hash)
1171      return EINA_FALSE;
1172    pthread_rwlock_wrlock(&_ecore_thread_global_hash_lock);
1173    if (direct)
1174      ret = eina_hash_direct_add(_ecore_thread_global_hash, key, d);
1175    else
1176      ret = eina_hash_add(_ecore_thread_global_hash, key, d);
1177    pthread_rwlock_unlock(&_ecore_thread_global_hash_lock);
1178    pthread_cond_broadcast(&_ecore_thread_global_hash_cond);
1179    return ret;
1180 #else
1181    return EINA_TRUE;
1182 #endif
1183 }
1184
1185 /**
1186  * @brief Add data to the global data
1187  * @param key The name string to add the data with
1188  * @param value The data to add
1189  * @param cb The optional callback to free the data with once ecore is shut down
1190  * @return An Ecore_Thread_Data on success, NULL on failure
1191  * This adds data to the global thread data and returns NULL, or replaces the previous data
1192  * associated with @p key and returning the previous data if it existed.  To see if an error occurred,
1193  * one must use eina_error_get.
1194  * All data added to global can be manually freed, or a callback can be provided with @p cb which will
1195  * be called upon ecore_thread shutting down.  Note that if you have manually freed data that a callback
1196  * was specified for, you will most likely encounter a segv later on.
1197  */
1198 EAPI void *
1199 ecore_thread_global_data_set(const char *key, void *value, Eina_Free_Cb cb)
1200 {
1201    Ecore_Thread_Data *d, *r;
1202    void *ret;
1203
1204    if ((!key) || (!value))
1205      return NULL;
1206 #ifdef EFL_HAVE_PTHREAD
1207    pthread_rwlock_wrlock(&_ecore_thread_global_hash_lock);
1208    if (!_ecore_thread_global_hash)
1209      _ecore_thread_global_hash = eina_hash_string_small_new(_ecore_thread_data_free);
1210    pthread_rwlock_unlock(&_ecore_thread_global_hash_lock);
1211
1212    if (!_ecore_thread_global_hash)
1213      return NULL;
1214
1215    if (!(d = malloc(sizeof(Ecore_Thread_Data))))
1216      return NULL;
1217
1218    d->data = value;
1219    d->cb = cb;
1220
1221    pthread_rwlock_wrlock(&_ecore_thread_global_hash_lock);
1222    r = eina_hash_set(_ecore_thread_global_hash, key, d);
1223    pthread_rwlock_unlock(&_ecore_thread_global_hash_lock);
1224    pthread_cond_broadcast(&_ecore_thread_global_hash_cond);
1225
1226    ret = r->data;
1227    free(r);
1228    return ret;
1229 #else
1230    return NULL;
1231 #endif
1232 }
1233
1234 /**
1235  * @brief Find data in the global data
1236  * @param key The name string the data is associated with
1237  * @return The value, or NULL on error
1238  * This finds data in the global data that has been previously added with @ref ecore_thread_global_data_add
1239  * This function will return NULL in any case but success.
1240  * All data added to global can be manually freed, or a callback can be provided with @p cb which will
1241  * be called upon ecore_thread shutting down.  Note that if you have manually freed data that a callback
1242  * was specified for, you will most likely encounter a segv later on.
1243  * @note Keep in mind that the data returned can be used by multiple threads at a time, so you will most likely want to mutex
1244  * if you will be doing anything with it.
1245  */
1246
1247 EAPI void *
1248 ecore_thread_global_data_find(const char *key)
1249 {
1250    Ecore_Thread_Data *ret;
1251    if (!key)
1252      return NULL;
1253 #ifdef EFL_HAVE_PTHREAD
1254    if (!_ecore_thread_global_hash) return NULL;
1255
1256    pthread_rwlock_rdlock(&_ecore_thread_global_hash_lock);
1257    ret = eina_hash_find(_ecore_thread_global_hash, key);
1258    pthread_rwlock_unlock(&_ecore_thread_global_hash_lock);
1259    return ret->data;
1260 #else
1261    return NULL;
1262 #endif
1263 }
1264
1265 /**
1266  * @brief Delete data from the global data
1267  * @param key The name string the data is associated with
1268  * @return EINA_TRUE on success, EINA_FALSE on failure
1269  * This deletes the data pointer from the global data which was previously added with @ref ecore_thread_global_data_add
1270  * This function will return EINA_FALSE in any case but success.
1271  * Note that this WILL free the data if an @c Eina_Free_Cb was specified when the data was added.
1272  */
1273 EAPI Eina_Bool
1274 ecore_thread_global_data_del(const char *key)
1275 {
1276    Eina_Bool ret;
1277    Ecore_Thread_Data *d;
1278
1279    if (!key)
1280      return EINA_FALSE;
1281 #ifdef EFL_HAVE_PTHREAD
1282    if (!_ecore_thread_global_hash)
1283      return EINA_FALSE;
1284
1285    pthread_rwlock_wrlock(&_ecore_thread_global_hash_lock);
1286    if ((d = eina_hash_find(_ecore_thread_global_hash, key)))
1287      _ecore_thread_data_free(d);
1288    ret = eina_hash_del_by_key(_ecore_thread_global_hash, key);
1289    pthread_rwlock_unlock(&_ecore_thread_global_hash_lock);
1290    return ret;
1291 #else
1292    return EINA_TRUE;
1293 #endif
1294 }
1295
1296 /**
1297  * @brief Find data in the global data and optionally wait for the data if not found
1298  * @param key The name string the data is associated with
1299  * @param seconds The amount of time in seconds to wait for the data.  If 0, the call will be async and not wait for data.
1300  * If < 0 the call will wait indefinitely for the data.
1301  * @return The value, or NULL on failure
1302  * This finds data in the global data that has been previously added with @ref ecore_thread_global_data_add
1303  * This function will return NULL in any case but success.
1304  * Use @p seconds to specify the amount of time to wait.  Use > 0 for an actual wait time, 0 to not wait, and < 0 to wait indefinitely.
1305  * @note Keep in mind that the data returned can be used by multiple threads at a time, so you will most likely want to mutex
1306  * if you will be doing anything with it.
1307  */
1308 EAPI void *
1309 ecore_thread_global_data_wait(const char *key, double seconds)
1310 {
1311    double time = 0;
1312    Ecore_Thread_Data *ret = NULL;
1313    if (!key)
1314      return NULL;
1315 #ifdef EFL_HAVE_PTHREAD
1316    if (!_ecore_thread_global_hash)
1317      return NULL;
1318    if (seconds > 0)
1319      time = ecore_time_get() + seconds;
1320
1321    while (1)
1322      {
1323         struct timespec t = { 0, 0 };
1324
1325         t.tv_sec = (long int)time;
1326         t.tv_nsec = (long int)((time - (double)t.tv_sec) * 1000000000);
1327         pthread_rwlock_rdlock(&_ecore_thread_global_hash_lock);
1328         ret = eina_hash_find(_ecore_thread_global_hash, key);
1329         pthread_rwlock_unlock(&_ecore_thread_global_hash_lock);
1330         if ((ret) || (!seconds) || ((seconds > 0) && (time <= ecore_time_get())))
1331           break;
1332         pthread_mutex_lock(&_ecore_thread_global_hash_mutex);
1333         pthread_cond_timedwait(&_ecore_thread_global_hash_cond, &_ecore_thread_global_hash_mutex, &t);
1334         pthread_mutex_unlock(&_ecore_thread_global_hash_mutex);
1335      }
1336    if (ret) return ret->data;
1337    return NULL;
1338 #else
1339    return NULL;
1340 #endif
1341 }
1342
1343 /**
1344  * @}
1345  */
1346
1347 /**
1348  * @}
1349  */