common: added support for externally pumped mainloops (superloops).
authorKrisztian Litkey <kli@iki.fi>
Fri, 8 Jun 2012 22:16:09 +0000 (01:16 +0300)
committerKrisztian Litkey <kli@iki.fi>
Fri, 8 Jun 2012 22:16:09 +0000 (01:16 +0300)
Murphy mainloop has now an abstraction - called a superloop - for
external mainloops that pump the murphy mainloop. This abstraction
layer defines the set of functions one needs to implement to integrate
the native murphy mainloop to an external mainloop. Practically this
means implementing functios for

  - adding/deleting file descriptor I/O watches,
  - adding/deleting/modifying timers, and
  - adding/deleting/enabling/disabling deferred callbacks

To integrate murphy to any external mainloop one needs to

   - implement these functions using the external mainloop, and
   - register these functions as the superloops API to murphy
     (using mrp_set_superloop)

After this whenever the superloop is iterated/run, the murphy mainloop
will be iterated/run as well.

src/common/mainloop.c
src/common/mainloop.h

index 8887be4..4a77da8 100644 (file)
@@ -107,18 +107,18 @@ typedef struct {
  */
 
 struct mrp_subloop_s {
-    mrp_list_hook_t     hook;                    /* to list of subloops */
-    int               (*free)(void *ptr);        /* cb to free memory */
-    mrp_subloop_ops_t  *cb;                      /* subloop glue callbacks */
-    void               *user_data;               /* opaque subloop data */
-    int                 epollfd;                 /* epollfd for this subloop */
-    struct epoll_event *events;                  /* epoll event buffer */
-    int                 nevent;                  /* epoll event buffer size */
-    mrp_io_watch_t     *w;                       /* watch for epollfd */
-    struct pollfd      *pollfds;                 /* pollfds for this subloop */
-    int                 npollfd;                 /* number of pollfds */
-    int                 pending;                 /* pending events */
-    int                 poll;                    /* need to poll for events */
+    mrp_list_hook_t      hook;                   /* to list of subloops */
+    int                (*free)(void *ptr);       /* cb to free memory */
+    mrp_subloop_ops_t   *cb;                     /* subloop glue callbacks */
+    void                *user_data;              /* opaque subloop data */
+    int                  epollfd;                /* epollfd for this subloop */
+    struct epoll_event  *events;                 /* epoll event buffer */
+    int                  nevent;                 /* epoll event buffer size */
+    mrp_io_watch_t      *w;                      /* watch for epollfd */
+    struct pollfd       *pollfds;                /* pollfds for this subloop */
+    int                  npollfd;                /* number of pollfds */
+    int                  pending;                /* pending events */
+    int                  poll;                   /* need to poll for events */
 };
 
 
@@ -127,33 +127,48 @@ struct mrp_subloop_s {
  * main loop
  */
 
+typedef enum {
+    MRP_SUPER_EVENT_NONE  = 0x0,
+    MRP_SUPER_EVENT_IO    = 0x1,
+    MRP_SUPER_EVENT_TIMER = 0x2,
+    MRP_SUPER_EVENT_DEFER = 0x4,
+} super_event_t;
+
+
 struct mrp_mainloop_s {
-    int                 epollfd;                 /* our epoll descriptor */
-    struct epoll_event *events;                  /* epoll event buffer */
-    int                 nevent;                  /* epoll event buffer size */
+    int                  epollfd;                /* our epoll descriptor */
+    struct epoll_event  *events;                 /* epoll event buffer */
+    int                  nevent;                 /* epoll event buffer size */
 
-    mrp_list_hook_t     iowatches;               /* list of I/O watches */
-    int                 niowatch;                /* number of I/O watches */
+    mrp_list_hook_t      iowatches;              /* list of I/O watches */
+    int                  niowatch;               /* number of I/O watches */
 
-    mrp_list_hook_t     timers;                  /* list of timers */
-    mrp_timer_t        *next_timer;              /* next expiring timer */
+    mrp_list_hook_t      timers;                 /* list of timers */
+    mrp_timer_t         *next_timer;             /* next expiring timer */
 
-    mrp_list_hook_t     deferred;                /* list of deferred cbs */
-    mrp_list_hook_t     inactive_deferred;       /* inactive defferred cbs */
+    mrp_list_hook_t      deferred;               /* list of deferred cbs */
+    mrp_list_hook_t      inactive_deferred;      /* inactive defferred cbs */
 
-    int                 poll_timeout;            /* next poll timeout */
-    int                 poll_result;             /* return value from poll */
+    int                  poll_timeout;           /* next poll timeout */
+    int                  poll_result;            /* return value from poll */
 
-    int                 sigfd;                   /* signal polling fd */
-    sigset_t            sigmask;                 /* signal mask */
-    mrp_io_watch_t     *sigwatch;                /* sigfd I/O watch */
-    mrp_list_hook_t     sighandlers;             /* signal handlers */
+    int                  sigfd;                  /* signal polling fd */
+    sigset_t             sigmask;                /* signal mask */
+    mrp_io_watch_t      *sigwatch;               /* sigfd I/O watch */
+    mrp_list_hook_t      sighandlers;            /* signal handlers */
 
-    mrp_list_hook_t     subloops;                /* external main loops */
+    mrp_list_hook_t      subloops;               /* external main loops */
     
-    mrp_list_hook_t     deleted;                 /* unfreed deleted items */
-    int                 quit;                    /* TRUE if _quit called */
-    int                 exit_code;               /* returned from _run */
+    mrp_list_hook_t      deleted;                /* unfreed deleted items */
+    int                  quit;                   /* TRUE if _quit called */
+    int                  exit_code;              /* returned from _run */
+
+    mrp_superloop_ops_t *super_ops;              /* superloop options */
+    void                *super_data;             /* superloop glue data */
+    super_event_t        super_events;           /* pending superloop events  */
+    void                *iow;                    /* superloop epollfd watch */
+    void                *timer;                  /* superloop timer */
+    void                *work;                   /* superloop deferred work */
 };
 
 
@@ -674,7 +689,7 @@ void mrp_del_sighandler(mrp_sighandler_t *h)
 
 
 /*
- * external mainloops
+ * external mainloops we pump
  */
 
 static int free_subloop(void *ptr)
@@ -773,6 +788,146 @@ void mrp_del_subloop(mrp_subloop_t *sl)
 
 
 /*
+ * external mainloop that pumps us
+ */
+
+static void super_io_cb(void *super_data, void *id, int fd,
+                       mrp_io_event_t events, void *user_data)
+{
+    mrp_mainloop_t      *ml  = (mrp_mainloop_t *)user_data;
+    mrp_superloop_ops_t *ops = ml->super_ops;
+
+    MRP_UNUSED(super_data);
+    MRP_UNUSED(id);
+    MRP_UNUSED(fd);
+    MRP_UNUSED(events);
+
+    ml->super_events |= MRP_SUPER_EVENT_IO;
+    ops->mod_defer(ml->super_data, ml->work, TRUE);
+}
+
+
+static void super_timer_cb(void *super_data, void *id, void *user_data)
+{
+    mrp_mainloop_t      *ml  = (mrp_mainloop_t *)user_data;
+    mrp_superloop_ops_t *ops = ml->super_ops;
+
+    MRP_UNUSED(super_data);
+    MRP_UNUSED(id);
+
+    ml->super_events |= MRP_SUPER_EVENT_TIMER;
+    ops->mod_defer(ml->super_data, ml->work, TRUE);
+}
+
+
+static void super_work_cb(void *super_data, void *id, void *user_data)
+{
+    mrp_mainloop_t      *ml  = (mrp_mainloop_t *)user_data;
+    mrp_superloop_ops_t *ops = ml->super_ops;
+    unsigned int         timeout;
+
+    MRP_UNUSED(super_data);
+    MRP_UNUSED(id);
+
+    mrp_mainloop_poll(ml, FALSE);
+    mrp_mainloop_dispatch(ml);
+    
+    if (!ml->quit) {
+       mrp_mainloop_prepare(ml);
+
+       /*
+        * Notes:
+        *
+        *     Some mainloop abstractions (eg. the one in PulseAudio)
+        *     have deferred callbacks that starve all other event
+        *     processing until no more deferred callbacks are pending.
+        *     For this reason, we cannot map our deferred callbacks
+        *     directly to superloop deferred callbacks (in some cases
+        *     this could starve the superloop indefinitely). Hence, if
+        *     we have enabled deferred callbacks, we arm our timer with
+        *     0 timeout to let the superloop do one round of its event
+        *     processing.
+        */
+
+       timeout = mrp_list_empty(&ml->deferred) ? ml->poll_timeout : 0;
+       ops->mod_timer(ml->super_data, ml->timer, timeout);
+       ops->mod_defer(ml->super_data, ml->work, FALSE);
+    }
+    else {
+       ops->del_io(ml->super_data, ml->iow);
+       ops->del_timer(ml->super_data, ml->timer);
+       ops->del_defer(ml->super_data, ml->work);
+
+       ml->iow   = NULL;
+       ml->timer = NULL;
+       ml->work  = NULL;
+    }
+}
+
+
+int mrp_set_superloop(mrp_mainloop_t *ml, mrp_superloop_ops_t *ops,
+                     void *loop_data)
+{
+    mrp_io_event_t events;
+    int            timeout;
+
+    if (ml->super_ops == NULL) {
+       ml->super_ops  = ops;
+       ml->super_data = loop_data;
+
+       mrp_mainloop_prepare(ml);
+
+       events    = MRP_IO_EVENT_IN | MRP_IO_EVENT_OUT | MRP_IO_EVENT_HUP;
+       ml->iow   = ops->add_io(ml->super_data, ml->epollfd, events,
+                               super_io_cb, ml);
+       ml->work  = ops->add_defer(ml->super_data, super_work_cb, ml);
+
+       /*
+        * Notes:
+        *
+        *     Some mainloop abstractions (eg. the one in PulseAudio)
+        *     have deferred callbacks that starve all other event
+        *     processing until no more deferred callbacks are pending.
+        *     For this reason, we cannot map our deferred callbacks
+        *     directly to superloop deferred callbacks (in some cases
+        *     this could starve the superloop indefinitely). Hence, if
+        *     we have enabled deferred callbacks, we arm our timer with
+        *     0 timeout to let the superloop do one round of its event
+        *     processing.
+        */
+
+       timeout   = mrp_list_empty(&ml->deferred) ? ml->poll_timeout : 0;
+       ml->timer = ops->add_timer(ml->super_data, timeout, super_timer_cb, ml);
+
+       if (ml->iow != NULL && ml->timer != NULL && ml->work != NULL)
+           return TRUE;
+       else
+           mrp_clear_superloop(ml, ops, loop_data);
+    }
+
+    return FALSE;
+}
+
+
+int mrp_clear_superloop(mrp_mainloop_t *ml, mrp_superloop_ops_t *ops,
+                       void *loop_data)
+{
+    if (ml->super_ops == ops && ml->super_data == loop_data) {
+       if (ml->iow != NULL)
+           ops->del_io(ml->super_data, ml->iow);
+       
+       ml->super_ops  = NULL;
+       ml->super_data = NULL;
+
+       return TRUE;
+    }
+    else
+       return FALSE;
+}
+
+
+
+/*
  * mainloop
  */
 
@@ -1099,12 +1254,14 @@ int mrp_mainloop_prepare(mrp_mainloop_t *ml)
 }
 
 
-int mrp_mainloop_poll(mrp_mainloop_t *ml)
+ int mrp_mainloop_poll(mrp_mainloop_t *ml, int may_block)
 {
-    int n;
-   
+    int n, timeout;
+    
+    timeout = may_block ? ml->poll_timeout : 0;
+    
     if (ml->nevent > 0) {
-       n = epoll_wait(ml->epollfd, ml->events, ml->nevent, ml->poll_timeout);
+       n = epoll_wait(ml->epollfd, ml->events, ml->nevent, timeout);
 
        if (n < 0 && errno == EINTR)
            n = 0;
@@ -1116,8 +1273,8 @@ int mrp_mainloop_poll(mrp_mainloop_t *ml)
         * Notes: Practically we should never branch here because
         *     we always have at least ml->sigfd registered for epoll.
         */
-       if (ml->poll_timeout > 0)
-           usleep(ml->poll_timeout * USECS_PER_MSEC);
+       if (timeout > 0)
+           usleep(timeout * USECS_PER_MSEC);
        
        ml->poll_result = 0;
     }
@@ -1306,7 +1463,7 @@ int mrp_mainloop_iterate(mrp_mainloop_t *ml)
 {
     return
        mrp_mainloop_prepare(ml) && 
-       mrp_mainloop_poll(ml) && 
+       mrp_mainloop_poll(ml, TRUE) && 
        mrp_mainloop_dispatch(ml) &&
        !ml->quit;
 }
index 368abab..09c40b4 100644 (file)
@@ -116,6 +116,39 @@ mrp_subloop_t *mrp_add_subloop(mrp_mainloop_t *ml, mrp_subloop_ops_t *ops,
 void mrp_del_subloop(mrp_subloop_t *sl);
 
 
+/*
+ * superloops - external mainloop to pump murphy mainloops
+ */
+
+typedef struct {
+    void *(*add_io)(void *glue_data, int fd, mrp_io_event_t events,
+                   void (*cb)(void *glue_data, void *id, int fd,
+                              mrp_io_event_t events, void *user_data),
+                   void *user_data);
+    void (*del_io)(void *glue_data, void *id);
+    
+    void *(*add_timer)(void *glue_data, unsigned int msecs,
+                      void (*cb)(void *glue_data, void *id, void *user_data),
+                      void *user_data);
+    void (*del_timer)(void *glue_data, void *id);
+    void (*mod_timer)(void *glue_data, void *id, unsigned int msecs);
+
+    void *(*add_defer)(void *glue_data,
+                      void (*cb)(void *glue_data, void *id, void *user_data),
+                      void *user_data);
+    void  (*del_defer)(void *glue_data, void *id);
+    void  (*mod_defer)(void *glue_data, void *id, int enabled);
+} mrp_superloop_ops_t;
+
+
+/** Set a superloop to pump the given mainloop. */
+int mrp_set_superloop(mrp_mainloop_t *ml, mrp_superloop_ops_t *ops,
+                     void *loop_data);
+
+/** Clear the superloop that pumps the given mainloop. */
+int mrp_clear_superloop(mrp_mainloop_t *ml, mrp_superloop_ops_t *ops,
+                       void *loop_data);
+
 
 /*
  * mainloop
@@ -134,7 +167,7 @@ int mrp_mainloop_run(mrp_mainloop_t *ml);
 int mrp_mainloop_prepare(mrp_mainloop_t *ml);
 
 /** Poll the mainloop. */
-int mrp_mainloop_poll(mrp_mainloop_t *ml);
+int mrp_mainloop_poll(mrp_mainloop_t *ml, int may_block);
 
 /** Dispatch pending events of the mainloop. */
 int mrp_mainloop_dispatch(mrp_mainloop_t *ml);