ecore: add Ecore_Coroutine.
authorCedric BAIL <cedric@efl.so>
Mon, 18 Feb 2013 13:32:51 +0000 (14:32 +0100)
committerCedric BAIL <cedric@efl.so>
Mon, 18 Feb 2013 13:38:33 +0000 (14:38 +0100)
That work clearly was possible thanks to Leandro. If you want more information
go to his blog : http://tia.mat.br/posts/async_io_with_coroutines/ .

The main difference with his implementation is more portable and not thread safe.
It does not have a custom swapcontext (would make sense as we don't need to save
the sigcontext) so it will be less fast. If people are ready to contribute asm
patch for that purpose I will be happy to apply them.

As for portability this code should work on all architecture we already support
thanks to a nice hack with setjmp/longjmp borowed from libcoroutine. We do use
Fiber for Windows support, but as 1.8 is completely borken in that regard, this
is theorical work only.

Thinks left to do :
- Eoify the API
- Documentation
- More tests
- Add support for coroutine in fd handler
- Add coroutine support to ecore_thread api
- Write some example

configure.ac
src/Makefile_Ecore.am
src/lib/ecore/Ecore.h
src/lib/ecore/ecore.c
src/lib/ecore/ecore_coroutine.c [new file with mode: 0644]
src/lib/ecore/ecore_private.h
src/tests/ecore/ecore_suite.c
src/tests/ecore/ecore_suite.h
src/tests/ecore/ecore_test_coroutine.c [new file with mode: 0644]

index 3ab7427..31308a2 100644 (file)
@@ -1805,6 +1805,51 @@ fi
 
 EFL_ADD_LIBS([ECORE], [${LTLIBINTL}])
 
+# coroutine function specific
+
+AC_COMPILE_IFELSE(
+   [AC_LANG_PROGRAM(
+       [[
+#include <ucontext.h>
+       ]],
+       [[
+ucontext_t test;
+getcontext(&test);
+       ]])],
+   [have_ucontext="yes"],
+   [have_ucontext="no"])
+
+AC_MSG_CHECKING([for ucontext])
+AC_MSG_RESULT([${have_ucontext}])
+
+AC_COMPILE_IFELSE(
+   [AC_LANG_PROGRAM(
+       [[
+#include <setjmp.h>
+       ]],
+       [[
+jmp_buf context;
+setjmp(&context);
+       ]])],
+   [have_setjmp="yes"],
+   [have_setjmp="no"])
+
+AC_MSG_CHECKING([for setjmp])
+AC_MSG_RESULT([${have_setjmp}])
+
+if test "X${have_windows}" = "xyes"; then
+   AC_DEFINE(USE_FIBER, 1, [Define to 1 if you have Windows Fiber support.])
+   EFL_ADD_FEATURE([system], [coroutine], [fiber])
+elif test "x${have_ucontext}" = "xyes"; then
+   AC_DEFINE(USE_UCONTEXT, 1, [Define to 1 if you have posix ucontext functions.])
+   EFL_ADD_FEATURE([system], [coroutine], [ucontext])
+elif test "x${have_setjmp}" = "xyes"; then
+   AC_DEFINE(USE_SETJMP, 1, [Define to 1 if you have setjmp/longjmp functions.])
+   EFL_ADD_FEATURE([system], [coroutine], [setjmp])
+else
+   AC_MSG_ERROR([You don't have a working way to implement coroutine. Exiting...])
+fi
+
 ### Check availability
 
 EFL_LIB_END([Ecore])
index e901c7f..b409615 100644 (file)
@@ -27,6 +27,7 @@ lib/ecore/ecore_time.c \
 lib/ecore/ecore_timer.c \
 lib/ecore/ecore_thread.c \
 lib/ecore/ecore_throttle.c \
+lib/ecore/ecore_coroutine.c \
 lib/ecore/ecore_private.h
 
 if HAVE_WIN32
@@ -66,6 +67,7 @@ tests/ecore/ecore_test_ecore.c \
 tests/ecore/ecore_test_ecore_con.c \
 tests/ecore/ecore_test_ecore_x.c \
 tests/ecore/ecore_test_ecore_imf.c \
+tests/ecore/ecore_test_coroutine.c \
 tests/ecore/ecore_suite.h
 
 tests_ecore_ecore_suite_CPPFLAGS = \
index 34210f9..2f44326 100644 (file)
@@ -2779,6 +2779,27 @@ EAPI void *ecore_thread_global_data_wait(const char *key, double seconds);
  * @}
  */
 
+typedef struct _Ecore_Coroutine Ecore_Coroutine;
+typedef int (*Ecore_Coroutine_Cb)(void *data, Ecore_Coroutine *coro);
+
+typedef enum {
+  ECORE_COROUTINE_NEW,
+  ECORE_COROUTINE_RUNNING,
+  ECORE_COROUTINE_FINISHED
+} Ecore_Coroutine_State;
+
+EAPI Ecore_Coroutine *ecore_coroutine_add(int stack_size, Ecore_Coroutine_Cb func, void *data);
+EAPI void *ecore_coroutine_del(Ecore_Coroutine *coro);
+
+EAPI int ecore_coroutine_resume(Ecore_Coroutine *coro);
+EAPI void ecore_coroutine_yield(Ecore_Coroutine *coro, int value);
+
+EAPI void *ecore_coroutine_data_get(Ecore_Coroutine *coro);
+EAPI Ecore_Coroutine_State ecore_coroutine_state_get(Ecore_Coroutine *coro);
+
+EAPI void ecore_coroutine_defer(Ecore_Coroutine *coro, Eina_Free_Cb func, void *data);
+EAPI void *ecore_coroutine_alloc(Ecore_Coroutine *coro, size_t size);
+
 /**
  * @defgroup Ecore_Pipe_Group Pipe wrapper
  *
index 5c9ad0c..3c5a101 100644 (file)
@@ -166,6 +166,7 @@ ecore_init(void)
    _ecore_glib_init();
    _ecore_job_init();
    _ecore_time_init();
+   _ecore_coroutine_init();
 
    eina_lock_new(&_thread_mutex);
    eina_condition_new(&_thread_cond, &_thread_mutex);
@@ -234,8 +235,9 @@ ecore_shutdown(void)
        }
      if (--_ecore_init_count != 0)
        goto unlock;
-   
+
      if (_ecore_fps_debug) _ecore_fps_debug_shutdown();
+     _ecore_coroutine_shutdown();
      _ecore_poller_shutdown();
      _ecore_animator_shutdown();
      _ecore_glib_shutdown();
diff --git a/src/lib/ecore/ecore_coroutine.c b/src/lib/ecore/ecore_coroutine.c
new file mode 100644 (file)
index 0000000..676141f
--- /dev/null
@@ -0,0 +1,339 @@
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <sys/time.h>
+#include <assert.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#if defined(USE_UCONTEXT)
+# include <sys/time.h>
+# include <sys/resource.h>
+# include <ucontext.h>
+#elif defined(USE_SETJMP)
+# include <sys/time.h>
+# include <sys/resource.h>
+# include <setjmp.h>
+#endif
+
+#ifdef HAVE_EVIL
+# include <Evil.h>
+#endif
+
+#ifdef _WIN32
+# define USE_FIBERS
+#endif
+
+#include "Ecore.h"
+#include "ecore_private.h"
+
+typedef struct _Ecore_Coroutine_Defer Ecore_Coroutine_Defer;
+struct _Ecore_Coroutine_Defer
+{
+   Eina_Free_Cb func;
+   void *data;
+};
+
+struct _Ecore_Coroutine
+{
+#if defined(USE_FIBERS)
+   void *context;
+#elif defined(USE_UCONTEXT)
+   ucontext_t context;
+#elif defined(USE_SETJMP)
+   jmp_buf context;
+#else
+# error "No coroutine implementation !"
+#endif
+
+   Eina_Inarray defer;
+
+   Ecore_Coroutine_Cb func;
+   void *data;
+
+   Ecore_Coroutine_State state;
+
+#ifdef USE_VALGRIND
+   int vg_stack_id;
+#endif
+   int yield_value;
+
+   unsigned char stack[1];
+};
+
+#ifdef __x86_64__
+static const int const _ecore_coroutine_default_stack_size = 16 * 1024;
+#else
+static const int const _ecore_coroutine_default_stack_size = 12 * 1024;
+#endif
+
+static void
+_ecore_coroutine_finish(Ecore_Coroutine *coro)
+{
+   int return_value = coro->func(coro->data, coro);
+
+   coro->state = ECORE_COROUTINE_FINISHED;
+   ecore_coroutine_yield(coro, return_value);
+}
+
+#if defined(USE_UCONTEXT)
+# ifdef __x86_64__
+union ptr_splitter {
+   void *ptr;
+   uint32_t part[sizeof(void *) / sizeof(uint32_t)];
+};
+
+static void
+_ecore_coroutine_entry_point(uint32_t part0, uint32_t part1)
+{
+   union ptr_splitter p = {
+     .part = { part0, part1 }
+   };
+   Ecore_Coroutine *coro = p.ptr;
+
+   _ecore_coroutine_finish(coro);
+}
+# else
+static void
+_ecore_coroutine_entry_point(Ecore_Coroutine *coro)
+{
+   _ecore_coroutine_finish(coro);
+}
+# endif
+#else
+static void
+_ecore_coroutine_entry_point(Ecore_Coroutine *coro)
+{
+   _ecore_coroutine_finish(coro);
+}
+# if defined(USE_SETJMP)
+static void
+_ecore_coroutine_setjmp(Ecore_Coroutine *coro)
+{
+   setjmp(coro->context);
+
+   /* The idea of this trick come from libcoroutine */
+   /* __jmpbuf[6] == stack pointer */
+   /* __jmpbuf[7] == program counter */
+   self->env[0].__jmpbuf[6] = ((uintptr_t)(&coro->stack));
+   self->env[0].__jmpbuf[7] = ((uintptr_t)_ecore_coroutine_entry_point);
+}
+# endif
+#endif
+
+#if defined(USE_FIBERS)
+static void *caller;
+static void *callee;
+#elif defined(USE_UCONTEXT)
+static ucontext_t caller;
+static ucontext_t callee;
+#elif defined(USE_SETJMP)
+static jmp_buf caller;
+static jmp_buf callee;
+#endif
+
+void
+_ecore_coroutine_init(void)
+{
+#if defined(USE_FIBERS)
+   caller = GetCurrentFiber();
+   if (caller == (LPVOID) 0x1e00)
+     {
+        caller = ConvertThreadToFiber(NULL);
+     }
+#else
+   memset(&caller, 0, sizeof (caller));
+   memset(&callee, 0, sizeof (callee));
+#endif
+}
+
+void
+_ecore_coroutine_shutdown(void)
+{
+#ifdef USE_FIBERS
+   ConvertFiberToThread();
+#endif
+
+   // FIXME: should we track lost coroutine ?
+}
+
+EAPI Ecore_Coroutine *
+ecore_coroutine_add(int stack_size, Ecore_Coroutine_Cb func, void *data)
+{
+   Ecore_Coroutine *coro;
+   unsigned char *stack;
+
+   if (stack_size <= 0)
+     {
+#if defined(USE_UCONTEXT) || defined(USE_SETJMP)
+        struct rlimit check;
+
+        if (getrlimit(RLIMIT_STACK, &check))
+          check.rlim_cur = _ecore_coroutine_default_stack_size;
+        stack_size = check.rlim_cur;
+#elif defined(USE_FIBERS)
+        stack_size = _ecore_coroutine_default_stack_size;
+#endif
+        if (stack_size < _ecore_coroutine_default_stack_size)
+          stack_size = _ecore_coroutine_default_stack_size;
+     }
+
+   coro = malloc(sizeof (Ecore_Coroutine) + stack_size - 1);
+   if (!coro) return NULL;
+
+   stack = coro->stack;
+
+#ifdef USE_VALGRIND
+   coro->vg_stack_id = VALGRIND_STACK_REGISTER(stack, stack + stack_size);
+#endif
+
+   coro->state = ECORE_COROUTINE_NEW;
+   coro->func = func;
+   coro->data = data;
+   eina_inarray_step_set(&coro->defer,
+                         sizeof (Eina_Inarray), sizeof (Ecore_Coroutine_Defer),
+                         8);
+
+#if defined(USE_UCONTEXT)
+   getcontext(&coro->context);
+
+   coro->context.uc_stack.ss_sp = stack;
+   coro->context.uc_stack.ss_size = stack_size;
+   coro->context.uc_stack.ss_flags = 0;
+   coro->context.uc_link = NULL;
+
+# ifdef __x86_64__
+   union ptr_splitter p = { .ptr = coro };
+   makecontext(&coro->context, (void (*)())_ecore_coroutine_entry_point,
+               2, p.part[0], p.part[1]);
+# else
+   makecontext(&coro->context, (void (*)())_ecore_coroutine_entry_point,
+               1, coro);
+# endif
+#elif defined(USE_FIBERS)
+   coro->context = CreateFiber(stack_size,
+                               (LPFIBER_START_ROUTINE)_ecore_coroutine_entry_point,
+                               coro);
+   if (!coro->context)
+     {
+        free(coro);
+        return NULL;
+     }
+#elif defined(USE_SETJMP)
+   /* We use an intermediate function call to setup the stack with the right arguments */
+   _ecore_coroutine_setjmp(coro);
+#endif
+
+   return coro;
+}
+
+EAPI void *
+ecore_coroutine_del(Ecore_Coroutine *coro)
+{
+   void *data;
+
+   data = coro->data;
+
+   while (eina_inarray_count(&coro->defer))
+     {
+        Ecore_Coroutine_Defer *defer;
+
+        defer = eina_inarray_pop(&coro->defer);
+        defer->func(defer->data);
+     }
+   eina_inarray_flush(&coro->defer);
+
+#ifdef USE_VALGRIND
+   VALGRIND_STACK_DEREGISTER(coro->vg_stack_id);
+#endif
+
+#ifdef USE_FIBERS
+   DeleteFiber(coro->context);
+#endif
+
+   free(coro);
+   return data;
+}
+
+EAPI int
+ecore_coroutine_resume(Ecore_Coroutine *coro)
+{
+#if defined(USE_FIBERS)
+   void *prev_caller;
+#elif defined(USE_UCONTEXT)
+   ucontext_t prev_caller;
+#elif defined(USE_SETJMP)
+   jmp_buf prev_caller;
+#endif
+
+   if (coro->state == ECORE_COROUTINE_FINISHED)
+     return 0;
+   coro->state = ECORE_COROUTINE_RUNNING;
+
+   prev_caller = caller;
+#if defined(USE_FIBERS)
+   SwitchToFiber(coro->context);
+#elif defined(USE_UCONTEXT)
+   swapcontext(&caller, &coro->context);
+#elif defined(USE_SETJMP)
+   setjmp(caller);
+   longjmp(coro->context);
+#endif
+#ifndef USE_FIBERS
+   // As fiber do handle the callee stack for us, no need here
+   coro->context = callee;
+#endif
+   caller = prev_caller;
+
+   return coro->yield_value;
+}
+
+EAPI void
+ecore_coroutine_yield(Ecore_Coroutine *coro, int value)
+{
+   coro->yield_value = value;
+#if defined(USE_FIBERS)
+   SwitchToFiber(caller);
+#elif defined(USE_UCONTEXT)
+   swapcontext(&callee, &caller);
+#elif defined(USE_SETJMP)
+   setjmp(callee);
+   longjmp(caller);
+#endif
+}
+
+EAPI void *
+ecore_coroutine_data_get(Ecore_Coroutine *coro)
+{
+   return coro->data;
+}
+
+EAPI Ecore_Coroutine_State
+ecore_coroutine_state_get(Ecore_Coroutine *coro)
+{
+   return coro->state;
+}
+
+EAPI void
+ecore_coroutine_defer(Ecore_Coroutine *coro, Eina_Free_Cb func, void *data)
+{
+   Ecore_Coroutine_Defer *defer;
+
+   defer = eina_inarray_grow(&coro->defer, 1);
+   defer->func = func;
+   defer->data = data;
+}
+
+EAPI void *
+ecore_coroutine_alloc(Ecore_Coroutine *coro, size_t size)
+{
+   void *data;
+
+   data = malloc(size);
+   ecore_coroutine_defer(coro, free, data);
+   return data;
+}
+
+
index b9c11b5..1c7f53e 100644 (file)
@@ -238,6 +238,9 @@ void _ecore_job_shutdown(void);
 void _ecore_main_loop_init(void);
 void _ecore_main_loop_shutdown(void);
 
+void _ecore_coroutine_init(void);
+void _ecore_coroutine_shutdown(void);
+
 void _ecore_throttle(void);
 
 void _ecore_main_call_flush(void);
index 5c631a3..090a1e4 100644 (file)
@@ -25,6 +25,7 @@ static const Ecore_Test_Case etc[] = {
 #if HAVE_ECORE_AUDIO
   { "Ecore Audio", ecore_test_ecore_audio},
 #endif
+  { "Ecore_Coroutine", ecore_test_coroutine },
   { NULL, NULL }
 };
 
index 6473d2a..80aee40 100644 (file)
@@ -8,6 +8,6 @@ void ecore_test_ecore_con(TCase *tc);
 void ecore_test_ecore_x(TCase *tc);
 void ecore_test_ecore_imf(TCase *tc);
 void ecore_test_ecore_audio(TCase *tc);
-
+void ecore_test_coroutine(TCase *tc);
 
 #endif /* _ECORE_SUITE_H */
diff --git a/src/tests/ecore/ecore_test_coroutine.c b/src/tests/ecore/ecore_test_coroutine.c
new file mode 100644 (file)
index 0000000..e6c200a
--- /dev/null
@@ -0,0 +1,86 @@
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <stdio.h>
+#include <unistd.h>
+
+#include <Eina.h>
+#include <Ecore.h>
+
+#include "ecore_suite.h"
+
+typedef struct _Ecore_Coroutine_Test Ecore_Coroutine_Test;
+struct _Ecore_Coroutine_Test
+{
+   int v;
+   int s;
+};
+
+static Ecore_Coroutine_Test t1;
+
+static int
+_ecore_test_v1(void *data, Ecore_Coroutine *coro)
+{
+   int *t = data;
+
+   ecore_coroutine_yield(coro, 7);
+   t1.v = *t;
+   ecore_coroutine_yield(coro, 42);
+   t1.s = *t;
+
+   return 0xDEADBEEF;
+}
+
+static int
+_ecore_test_v2(void *data, Ecore_Coroutine *coro EINA_UNUSED)
+{
+   int *t = data;
+
+   ecore_coroutine_yield(coro, 42);
+   t1.v = *t;
+   ecore_coroutine_yield(coro, 7);
+   t1.s = *t;
+
+   return 0xDEADBEEF;
+}
+
+START_TEST(ecore_test_coroutine_simple)
+{
+   Ecore_Coroutine *coro1;
+   Ecore_Coroutine *coro2;
+   int value[] = { 7, 42, 0xDEADBEEF };
+
+   ecore_init();
+
+   t1.v = 0xDEADBEEF; t1.s = 0xDEADBEEF;
+
+   coro1 = ecore_coroutine_add(0, _ecore_test_v1, &value[0]);
+   coro2 = ecore_coroutine_add(4*1024*1024, _ecore_test_v2, &value[1]);
+
+   fail_if(ecore_coroutine_state_get(coro1) != ECORE_COROUTINE_NEW);
+   fail_if(ecore_coroutine_resume(coro1) != 7);
+   fail_if(t1.v != 0xDEADBEEF || t1.s != 0xDEADBEEF);
+   fail_if(ecore_coroutine_resume(coro2) != 42);
+   fail_if(t1.v != 0xDEADBEEF || t1.s != 0xDEADBEEF);
+   fail_if(ecore_coroutine_resume(coro1) != 42);
+   fail_if(t1.v != value[0] || t1.s != 0xDEADBEEF);
+   fail_if(ecore_coroutine_resume(coro1) != 0xDEADBEEF);
+   fail_if(t1.v != value[0] || t1.s != value[0]);
+   fail_if(ecore_coroutine_resume(coro2) != 7);
+   fail_if(t1.v != value[1] || t1.s != value[0]);
+   fail_if(ecore_coroutine_resume(coro2) != 0xDEADBEEF);
+   fail_if(t1.v != value[1] || t1.s != value[1]);
+   fail_if(ecore_coroutine_state_get(coro2) != ECORE_COROUTINE_FINISHED);
+
+   fail_if(ecore_coroutine_del(coro1) != &value[0]);
+   fail_if(ecore_coroutine_del(coro2) != &value[1]);
+
+   ecore_shutdown();
+}
+END_TEST
+
+void ecore_test_coroutine(TCase *tc)
+{
+  tcase_add_test(tc, ecore_test_coroutine_simple);
+}