Some example for Ecore_Thread
authorsachiel <sachiel@7cbeb6ba-43b4-40fd-8cce-4c39aea84d33>
Tue, 12 Jul 2011 13:38:25 +0000 (13:38 +0000)
committersachiel <sachiel@7cbeb6ba-43b4-40fd-8cce-4c39aea84d33>
Tue, 12 Jul 2011 13:38:25 +0000 (13:38 +0000)
It has some issues that need debugging, I'm not sure if Ecore_Thread doesn't
like having stuff done before the main loop is running and valgrind was also
complaining about some invalid reads in ecore_thread.c, but at least the
example is there for people to look at and report problems, things not clear
or just about anything that may come up from it.

git-svn-id: http://svn.enlightenment.org/svn/e/trunk/ecore@61293 7cbeb6ba-43b4-40fd-8cce-4c39aea84d33

doc/examples.dox
src/examples/Makefile.am
src/examples/ecore_thread_example.c [new file with mode: 0644]
src/lib/ecore/Ecore.h

index 2dbd6d6..09984f2 100644 (file)
  * @until }
  *
  * @example ecore_animator_example.c
- */
\ No newline at end of file
+ */
+
+/**
+ * @page ecore_thread_example_c Ecore_Thread - API overview
+ *
+ * Working with threads is hard. Ecore helps to do so a bit easier, but as
+ * the example in @ref ecore_thread_example.c "ecore_thread_example.c" shows,
+ * there's a lot to consider even when doing the most simple things.
+ *
+ * We'll be going through this thorough example now, showing how the differents
+ * aspects of @ref Ecore_Thread are used, but users are encourage to avoid
+ * threads unless it's really the only option, as they always add more
+ * complexity than the program usually requires.
+ *
+ * Ecore Threads come in two flavors, short jobs and feedback jobs. Short jobs
+ * just run the given function and are more commonly used for small tasks
+ * where the main loop does not need to know how the work is going in between.
+ * The short job in our example is so short we had to artifically enlarge it
+ * with @c sleep(). Other than that, it also uses threads local data to keep
+ * the data we are working with persistent across different jobs ran by the
+ * same system thread. This data will be freed when the no more jobs are
+ * pending and the thread is terminated. If the data doesn't exist in the
+ * thread's storage, we create it and save it there for future jobs to find
+ * it. If creation fails, we cancel ourselves, so the main loop knows that
+ * we didn't just exit normally, meaning the job could not be done. The main
+ * part of the function checks in each iteration if it was cancelled by the
+ * main loop, and if it was, it stops processing and clears the data from the
+ * storage (we assume @c cancel means no one else will need this, but this is
+ * really application dependant).
+ * @dontinclude ecore_thread_example.c
+ * @skip static void
+ * @until sleep(1)
+ * @until }
+ * @until }
+ *
+ * Feedback jobs, on the other hand, run tasks that will inform back to the
+ * main loop its progress, send partial data as is processed, just ping saying
+ * it's still alive and processing, or anything that needs the thread to talk
+ * back to the main loop.
+ * @skip static void
+ * @until the_end
+ * @until }
+ *
+ * Finally, one more feedback job, but this one will be running outside of
+ * Ecore's pool, so we can use the pool for real work and keep this very
+ * light function unchecked. All it does is check if some condition is met
+ * and send a message to the main loop telling it it's time to close.
+ * @skip static void
+ * @until }
+ * @until }
+ * @until }
+ *
+ * Every now and then the program prints its status, counting threads running
+ * and pending jobs.
+ * @skip static void
+ * @until }
+ *
+ * In our main loop, we'll be receiving messages from our feedback jobs using
+ * the same callback for both of them.
+ * @skip static void
+ * @until char *str
+ *
+ * The light job running out of the pool will let us know when we can exit our
+ * program.
+ * @until }
+ *
+ * Next comes the handling of data sent from the actual worker threads, always
+ * remembering that the data belongs to us now, and not the thread, so it's
+ * our responsibility to free it.
+ * @until }
+ * @until }
+ *
+ * Last, the condition to exit is given by how many messages we want to handle,
+ * so we need to count them and inform the condition checking thread that the
+ * value changed.
+ * @until }
+ *
+ * When a thread finishes its job or gets cancelled, the main loop is notified
+ * through the callbacks set when creating the task. In this case, we just
+ * print what happen and keep track of one of them used to exemplify cancelling.
+ * Here we are pretending one of our short jobs has a timeout, so if it doesn't
+ * finish before a timer is triggered, it will be cancelled.
+ * @skip static void
+ * @until _cancel_timer_cb
+ * @until }
+ *
+ * The main function does some setup that includes reading parameters from
+ * the command line to change its behaviour and test different results.
+ * These are:
+ * @li -t <some_num> maximum number of threads to run at the same time.
+ * @li -p <some_path> adds @c some_path to the list used by the feedback jobs.
+ * This parameter can be used multiple times.
+ * @li -m <some_num> the number of messages to process before the program is
+ * signalled to exit.
+ *
+ * Skipping some bits, we init Ecore and our application data.
+ * @skip ecore_init
+ * @until appdata.max_msgs
+ *
+ * If any paths for the feedback jobs were given, we use them, otherwise we
+ * fallback to some defaults. Always initting the proper mutexes used by the
+ * threaded job.
+ * @skip path_list
+ * @until EINA_LIST_FREE
+ * @until }
+ * @until }
+ *
+ * Initialize the mutex needed for the condition checking thread
+ * @skip appdata.mutex
+ * @until appdata.condition
+ *
+ * And start our tasks.
+ * @until appdata.thread_3
+ * @until EINA_FALSE
+ *
+ * To finalize, set a timer to cancel one of the tasks if it doesn't end
+ * before the timeout, one more timer for status report and get into the main
+ * loop. Once we are out, destroy our mutexes and finish the program.
+ * @until _status_timer_cb
+ * @until }
+ *
+ * @example ecore_thread_example.c
+ */
index 2b154d0..a6614fa 100644 (file)
@@ -37,7 +37,8 @@ SRCS = \
        ecore_fd_handler_gnutls_example.c \
        ecore_file_download_example.c \
        ecore_pipe_simple_example.c \
-       ecore_pipe_gstreamer_example.c
+       ecore_pipe_gstreamer_example.c \
+       ecore_thread_example.c
 
 EXTRA_DIST = $(SRCS)
 
@@ -61,7 +62,8 @@ pkglib_PROGRAMS += \
        ecore_pipe_simple_example \
        ecore_con_lookup_example \
        ecore_con_url_headers_example \
-       ecore_con_url_download_example
+       ecore_con_url_download_example \
+       ecore_thread_example
 
 ecore_animator_example_LDADD = $(ECOREBASELDADD) @EVAS_LIBS@ $(top_builddir)/src/lib/ecore_evas/libecore_evas.la
 ecore_con_lookup_example_LDADD = $(ECOREBASELDADD) $(top_builddir)/src/lib/ecore_con/libecore_con.la
diff --git a/src/examples/ecore_thread_example.c b/src/examples/ecore_thread_example.c
new file mode 100644 (file)
index 0000000..7028b25
--- /dev/null
@@ -0,0 +1,394 @@
+/*
+ * gcc -o ecore_thread_example ecore_thread_example.c `pkg-config --cflags --libs ecore`
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <dirent.h>
+#include <Ecore.h>
+#include <Ecore_Getopt.h>
+
+typedef struct
+{
+   Ecore_Thread *thread_3;
+   int msgs_received;
+   int max_msgs;
+   Eina_Lock mutex;
+   Eina_Condition condition;
+} App_Data;
+
+typedef struct
+{
+   Eina_List *list;
+} Thread_Data;
+
+typedef struct
+{
+   char *name;
+   char *base;
+   Eina_Lock mutex;
+} Feedback_Thread_Data;
+
+typedef struct
+{
+   int all_done;
+   Eina_List *list;
+} App_Msg;
+
+static void
+_local_data_free(void *data)
+{
+   Thread_Data *td = data;
+   char *str;
+
+   EINA_LIST_FREE(td->list, str)
+     {
+        printf("Freeing string: %s\n", str);
+        free(str);
+     }
+   free(td);
+}
+
+static void
+_short_job(void *data, Ecore_Thread *th)
+{
+   Thread_Data *td;
+   int i;
+
+   td = ecore_thread_local_data_find(th, "data");
+   if (!td)
+     {
+        td = calloc(1, sizeof(Thread_Data));
+        if (!td)
+          {
+             ecore_thread_cancel(th);
+             return;
+          }
+        ecore_thread_local_data_add(th, "data", td, _local_data_free,
+                                    EINA_FALSE);
+     }
+
+   for (i = 0; i < 10; i++)
+     {
+        char buf[200];
+
+        if (ecore_thread_check(th))
+          {
+             ecore_thread_local_data_del(th, "data");
+             break;
+          }
+
+        snprintf(buf, sizeof(buf), "Thread %p: String number %d", th, i);
+        td->list = eina_list_append(td->list, strdup(buf));
+        sleep(1);
+     }
+}
+
+static void
+_feedback_job(void *data, Ecore_Thread *th)
+{
+   time_t t;
+   int i, count;
+   Feedback_Thread_Data *ftd = NULL;
+   DIR *dir;
+   App_Msg *msg;
+
+   count = (int)ecore_thread_global_data_find("count");
+   for (i = 0; i < count; i++)
+     {
+        char buf[32];
+        snprintf(buf, sizeof(buf), "data%d", i);
+        ftd = ecore_thread_global_data_find(buf);
+        if (!ftd)
+          continue;
+        if (eina_lock_take_try(&ftd->mutex))
+          break;
+        else
+          ftd = NULL;
+     }
+   if (!ftd)
+     return;
+
+   dir = opendir(ftd->base);
+   if (!dir)
+     goto the_end;
+
+   msg = calloc(1, sizeof(App_Msg));
+
+   t = time(NULL);
+   while (time(NULL) < t + 2)
+     {
+        struct dirent entry, *result;
+
+        if (readdir_r(dir, &entry, &result))
+          break;
+        if (!result)
+          break;
+
+        if (strlen(result->d_name) >= 10)
+          msg->list = eina_list_append(msg->list,
+                                       strdup(result->d_name));
+     }
+
+   closedir(dir);
+   ecore_thread_feedback(th, msg);
+
+the_end:
+   ecore_thread_global_data_del(ftd->name);
+   free(ftd->name);
+   free(ftd->base);
+   eina_lock_release(&ftd->mutex);
+   eina_lock_free(&ftd->mutex);
+   free(ftd);
+   ecore_thread_reschedule(th);
+}
+
+static void
+_out_of_pool_job(void *data, Ecore_Thread *th)
+{
+   App_Data *ad = data;
+   App_Msg *msg;
+
+   while (1)
+     {
+        int msgs;
+        eina_condition_wait(&ad->condition);
+        msgs = ad->msgs_received;
+        eina_lock_release(&ad->mutex);
+        if (msgs == ad->max_msgs)
+          {
+             msg = calloc(1, sizeof(App_Msg));
+             msg->all_done = 1;
+             ecore_thread_feedback(th, msg);
+             return;
+          }
+     }
+}
+
+static void
+_print_status(void)
+{
+   int active, pending_total, pending_feedback, pending_short, available;
+
+   active = ecore_thread_active_get();
+   pending_total = ecore_thread_pending_total_get();
+   pending_feedback = ecore_thread_pending_feedback_get();
+   pending_short = ecore_thread_pending_get();
+   available = ecore_thread_available_get();
+
+   printf("Status:\n\t* Active threads: %d\n"
+          "\t* Available threads: %d\n"
+          "\t* Pending short jobs: %d\n"
+          "\t* Pending feedback jobs: %d\n"
+          "\t* Pending total: %d\n", active, available, pending_short,
+          pending_feedback, pending_total);
+}
+
+static void
+_feedback_job_msg_cb(void *data, Ecore_Thread *th, void *msg_data)
+{
+   App_Data *ad = data;
+   App_Msg *msg = msg_data;
+   char *str;
+
+   if (msg->all_done)
+     {
+        ecore_main_loop_quit();
+        free(msg);
+        return;
+     }
+
+   _print_status();
+
+   if (!msg->list)
+     printf("Received an empty list from thread %p\n", th);
+   else
+     {
+        int i = 0;
+        printf("Received %d elements from threads %p (printing first 5):\n",
+               eina_list_count(msg->list), th);
+        EINA_LIST_FREE(msg->list, str)
+          {
+             if (i <= 5)
+               printf("\t%s\n", str);
+             free(str);
+             i++;
+          }
+     }
+
+   eina_lock_take(&ad->mutex);
+   ad->msgs_received++;
+   eina_condition_signal(&ad->condition);
+   eina_lock_release(&ad->mutex);
+
+   free(msg);
+}
+
+static void
+_thread_end_cb(void *data, Ecore_Thread *th)
+{
+   App_Data *ad = data;
+
+   printf("Normal termination for thread %p.\n", th);
+   if (th == ad->thread_3)
+     ad->thread_3 = NULL;
+}
+
+static void
+_thread_cancel_cb(void *data, Ecore_Thread *th)
+{
+   App_Data *ad = data;
+
+   printf("Thread %p got cancelled.\n", th);
+   if (th == ad->thread_3)
+     ad->thread_3 = NULL;
+}
+
+static Eina_Bool
+_cancel_timer_cb(void *data)
+{
+   App_Data *ad = data;
+
+   if (ad->thread_3 && !ecore_thread_check(ad->thread_3))
+     ecore_thread_cancel(ad->thread_3);
+
+   return EINA_FALSE;
+}
+
+static Eina_Bool
+_status_timer_cb(void *data)
+{
+   _print_status();
+
+   return EINA_TRUE;
+}
+
+static const Ecore_Getopt optdesc = {
+   "ecore_thread_example",
+   NULL,
+   "0.0",
+   "(C) 2011 Enlightenment",
+   "Public domain?",
+   "Example program for Ecore_Thread",
+   0,
+   {
+      ECORE_GETOPT_STORE_INT('t', "threads", "Max number of threads to run"),
+      ECORE_GETOPT_STORE_INT('m', "msgs", "Max number of messages to receive"),
+      ECORE_GETOPT_APPEND_METAVAR('p', "path", "Add path for feedback job",
+                                  "STRING", ECORE_GETOPT_TYPE_STR),
+      ECORE_GETOPT_HELP('h', "help"),
+      ECORE_GETOPT_SENTINEL
+   }
+};
+
+int
+main(int argc, char *argv[])
+{
+   int i, max_threads = 0, max_msgs = 0;
+   Eina_Bool opt_quit = EINA_FALSE;
+   Eina_List *path_list = NULL;
+   App_Data appdata;
+   Ecore_Getopt_Value values[] = {
+        ECORE_GETOPT_VALUE_INT(max_threads),
+        ECORE_GETOPT_VALUE_INT(max_msgs),
+        ECORE_GETOPT_VALUE_LIST(path_list),
+        ECORE_GETOPT_VALUE_BOOL(opt_quit),
+        ECORE_GETOPT_VALUE_NONE
+   };
+
+   ecore_init();
+
+   i = ecore_thread_max_get();
+   printf("Initial max threads: %d\n", i);
+
+   memset(&appdata, 0, sizeof(App_Data));
+   appdata.max_msgs = 1;
+
+   if (ecore_getopt_parse(&optdesc, values, argc, argv) < 0)
+     {
+        printf("Argument parsing failed\n");
+        return 1;
+     }
+
+   if (opt_quit)
+     return 0;
+
+   if (max_threads)
+     {
+        ecore_thread_max_set(max_threads);
+        printf("Max threads: %d\n", ecore_thread_max_get());
+     }
+   if (max_msgs)
+     appdata.max_msgs = max_msgs;
+
+   if (!path_list)
+     {
+        Feedback_Thread_Data *ftd;
+        ecore_thread_global_data_add("count", (void *)3, NULL, EINA_FALSE);
+        ftd = calloc(1, sizeof(Feedback_Thread_Data));
+        ftd->name = strdup("data0");
+        ftd->base = strdup("/usr/bin");
+        eina_lock_new(&ftd->mutex);
+        ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
+        ftd = calloc(1, sizeof(Feedback_Thread_Data));
+        ftd->name = strdup("data1");
+        ftd->base = strdup("/usr/lib");
+        eina_lock_new(&ftd->mutex);
+        ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
+        ftd = calloc(1, sizeof(Feedback_Thread_Data));
+        ftd->name = strdup("data2");
+        ftd->base = strdup("/usr/share");
+        eina_lock_new(&ftd->mutex);
+        ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
+     }
+   else
+     {
+        Feedback_Thread_Data *ftd;
+        char *str;
+        ecore_thread_global_data_add("count",
+                                     (void *)eina_list_count(path_list), NULL,
+                                     EINA_FALSE);
+        i = 0;
+        EINA_LIST_FREE(path_list, str)
+          {
+             char buf[32];
+             snprintf(buf, sizeof(buf), "data%d", i);
+             ftd = calloc(1, sizeof(Feedback_Thread_Data));
+             ftd->name = strdup(buf);
+             ftd->base = strdup(str);
+             eina_lock_new(&ftd->mutex);
+             ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
+             free(str);
+             i++;
+          }
+     }
+
+   eina_lock_new(&appdata.mutex);
+   eina_condition_new(&appdata.condition, &appdata.mutex);
+
+   ecore_thread_feedback_run(_out_of_pool_job, _feedback_job_msg_cb, NULL,
+                             NULL, &appdata, EINA_TRUE);
+
+   ecore_thread_run(_short_job, _thread_end_cb, _thread_cancel_cb, &appdata);
+   ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb,
+                             _thread_end_cb, _thread_cancel_cb, &appdata,
+                             EINA_FALSE);
+   appdata.thread_3 = ecore_thread_run(_short_job, _thread_end_cb,
+                                       _thread_cancel_cb, &appdata);
+   ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb,
+                             _thread_end_cb, _thread_cancel_cb, &appdata,
+                             EINA_FALSE);
+
+   ecore_timer_add(1.0, _cancel_timer_cb, &appdata);
+   ecore_timer_add(2.0, _status_timer_cb, NULL);
+
+   _print_status();
+
+   ecore_main_loop_begin();
+
+   eina_condition_free(&appdata.condition);
+   eina_lock_free(&appdata.mutex);
+
+   ecore_shutdown();
+
+   return 0;
+}
index 3f7ec73..c83bfd4 100644 (file)
@@ -826,6 +826,8 @@ extern "C" {
    * for the function running in the thread to send messages to the main
    * thread.
    *
+   * See an overview example in @ref ecore_thread_example_c.
+   *
    * @ingroup Ecore_Group
    *
    * @{