agent: Support invoking I/O callbacks in non-default contexts
authorPhilip Withnall <philip.withnall@collabora.co.uk>
Mon, 16 Dec 2013 13:43:35 +0000 (13:43 +0000)
committerOlivier Crête <olivier.crete@collabora.com>
Fri, 31 Jan 2014 06:48:58 +0000 (01:48 -0500)
If the Component’s I/O receiver machinery is invoked from a thread which
can’t acquire the main context specified for the I/O callbacks, the
callbacks need to be queued as idle handlers in that main context.

This is needed for the case where blocking reads are being performed in
one thread, with their callbacks needing to be delivered in another
thread.

This introduces a new fine-grained lock to Component: io_mutex. This
protects accesses to Component->io_callback, Component->io_user_data and
Component->pending_io_callbacks. If being locked at the same time as the
main agent lock, it must always be locked afterwards, but the agent lock
does not *have* to be held in order to lock io_mutex.

agent/agent.c
agent/component.c
agent/component.h

index e58f634..95e6a44 100644 (file)
@@ -1043,7 +1043,9 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
   g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *)&agent);
 
   do {
-    if (component->io_callback != NULL)
+    gboolean has_io_callback = component_has_io_callback (component);
+
+    if (has_io_callback)
       len = pseudo_tcp_socket_recv (sock, (gchar *) buf, sizeof(buf));
     else
       len = 0;
@@ -2696,7 +2698,7 @@ nice_agent_g_source_cb (
     nice_debug ("Agent %p: unable to recv from socket %p. Detaching",
         ctx->agent, ctx->socket);
     component_detach_socket_source (component, ctx->socket);
-  } else if (len > 0 && component->io_callback) {
+  } else if (len > 0) {
     component_emit_io_callback (component, buf, len);
   }
 
index 0a58284..a70e9e2 100644 (file)
 
 
 static void
+component_schedule_io_callback (Component *component);
+static void
+component_deschedule_io_callback (Component *component);
+
+
+/* Must *not* take the agent lock, since it’s called from within
+ * component_set_io_callback(), which holds the Component’s I/O lock. */
+static void
 socket_source_detach (SocketSource *source)
 {
   if (source->source != NULL) {
@@ -73,7 +81,6 @@ socket_source_free (SocketSource *source)
   g_slice_free (SocketSource, source);
 }
 
-
 Component *
 component_new (guint id, NiceAgent *agent, Stream *stream)
 {
@@ -87,6 +94,16 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
   component->agent = agent;
   component->stream = stream;
 
+  g_mutex_init (&component->io_mutex);
+  g_queue_init (&component->pending_io_messages);
+  component->io_callback_id = 0;
+
+  /* Start off with a fresh main context and all I/O paused. This
+   * will be updated when nice_agent_attach_recv() or nice_agent_recv() are
+   * called. */
+  component_set_io_context (component, NULL);
+  component_set_io_callback (component, NULL, NULL);
+
   return component;
 }
 
@@ -95,6 +112,7 @@ component_free (Component *cmp)
 {
   GSList *i;
   GList *item;
+  IOCallbackData *data;
 
   for (i = cmp->local_candidates; i; i = i->next) {
     NiceCandidate *candidate = i->data;
@@ -146,11 +164,18 @@ component_free (Component *cmp)
     cmp->tcp = NULL;
   }
 
+  while ((data = g_queue_pop_head (&cmp->pending_io_messages)) != NULL)
+    io_callback_data_free (data);
+
+  component_deschedule_io_callback (cmp);
+
   if (cmp->ctx != NULL) {
     g_main_context_unref (cmp->ctx);
     cmp->ctx = NULL;
   }
 
+  g_mutex_clear (&cmp->io_mutex);
+
   g_slice_free (Component, cmp);
 }
 
@@ -425,6 +450,9 @@ component_detach_socket_source (Component *component, NiceSocket *socket)
 /*
  * Detaches socket handles of @component from the main context. Leaves the
  * sockets themselves untouched.
+ *
+ * Must *not* take the agent lock, since it’s called from within
+ * component_set_io_callback(), which holds the Component’s I/O lock.
  */
 void
 component_detach_socket_sources (Component *component)
@@ -450,6 +478,8 @@ void
 component_set_io_callback (Component *component, NiceAgentRecvFunc func,
     gpointer user_data, GMainContext *context)
 {
+  g_mutex_lock (&component->io_mutex);
+
   /* Reference the context early so we don’t accidentally free it below. */
   if (context != NULL && func != NULL)
     g_main_context_ref (context);
@@ -468,7 +498,117 @@ component_set_io_callback (Component *component, NiceAgentRecvFunc func,
     component->io_callback = func;
     component->io_user_data = user_data;
     component->ctx = context;  /* referenced above */
+
+    component_schedule_io_callback (component);
+  } else {
+    component_deschedule_io_callback (component);
   }
+
+  g_mutex_unlock (&component->io_mutex);
+}
+
+gboolean
+component_has_io_callback (Component *component)
+{
+  gboolean has_io_callback;
+
+  g_mutex_lock (&component->io_mutex);
+  has_io_callback = (component->io_callback != NULL);
+  g_mutex_unlock (&component->io_mutex);
+
+  return has_io_callback;
+}
+
+IOCallbackData *
+io_callback_data_new (const guint8 *buf, gsize buf_len)
+{
+  IOCallbackData *data;
+
+  data = g_slice_new0 (IOCallbackData);
+  data->buf = g_memdup (buf, buf_len);
+  data->buf_len = buf_len;
+  data->offset = 0;
+
+  return data;
+}
+
+void
+io_callback_data_free (IOCallbackData *data)
+{
+  g_free (data->buf);
+  g_slice_free (IOCallbackData, data);
+}
+
+/* This is called with the global agent lock released. It does not take that
+ * lock, but does take the io_mutex. */
+static gboolean
+emit_io_callback_cb (gpointer user_data)
+{
+  Component *component = user_data;
+  IOCallbackData *data;
+  NiceAgentRecvFunc io_callback;
+  gpointer io_user_data;
+  guint stream_id, component_id;
+  NiceAgent *agent;
+
+  agent = component->agent;
+  g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
+
+  stream_id = component->stream->id;
+  component_id = component->id;
+
+  g_mutex_lock (&component->io_mutex);
+
+  /* The members of Component are guaranteed not to have changed since this
+   * GSource was attached in component_emit_io_callback(). The Component’s agent
+   * and stream are immutable after construction, as are the stream and
+   * component IDs. The callback and its user data may have changed, but are
+   * guaranteed to be non-%NULL at the start as the idle source is removed when
+   * the callback is set to %NULL. They may become %NULL during the io_callback,
+   * so must be re-checked every loop iteration. The data buffer is copied into
+   * the #IOCallbackData closure.
+   *
+   * If the component is destroyed (which happens if the agent or stream are
+   * destroyed) between attaching the GSource and firing it, the GSource is
+   * detached in component_free() and this callback is never invoked. If the
+   * agent is destroyed during an io_callback, its weak pointer will be
+   * nullified. Similarly, the Component needs to be re-queried for after every
+   * iteration, just in case the client has removed the stream in the
+   * callback. */
+  while (TRUE) {
+    io_callback = component->io_callback;
+    io_user_data = component->io_user_data;
+    data = g_queue_peek_head (&component->pending_io_messages);
+
+    if (data == NULL || io_callback == NULL)
+      break;
+
+    g_mutex_unlock (&component->io_mutex);
+
+    io_callback (agent, stream_id, component_id,
+        data->buf_len - data->offset, (gchar *) data->buf + data->offset,
+        io_user_data);
+
+    /* Check for the user destroying things underneath our feet. */
+    if (agent == NULL ||
+        !agent_find_component (agent, stream_id, component_id,
+            NULL, &component)) {
+      nice_debug ("%s: Agent or component destroyed.", G_STRFUNC);
+      return G_SOURCE_REMOVE;
+    }
+
+    g_queue_pop_head (&component->pending_io_messages);
+    io_callback_data_free (data);
+
+    g_mutex_lock (&component->io_mutex);
+  }
+
+  component->io_callback_id = 0;
+  g_mutex_unlock (&component->io_mutex);
+
+  g_object_remove_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
+
+  return G_SOURCE_REMOVE;
 }
 
 /* This must be called with the agent lock *held*. */
@@ -488,18 +628,80 @@ component_emit_io_callback (Component *component,
   agent = component->agent;
   stream_id = component->stream->id;
   component_id = component->id;
+
+  g_mutex_lock (&component->io_mutex);
   io_callback = component->io_callback;
   io_user_data = component->io_user_data;
+  g_mutex_unlock (&component->io_mutex);
+
+  /* Allow this to be called with a NULL io_callback, since the caller can’t
+   * lock io_mutex to check beforehand. */
+  if (io_callback == NULL)
+    return;
 
   g_assert (NICE_IS_AGENT (agent));
   g_assert (stream_id > 0);
   g_assert (component_id > 0);
   g_assert (io_callback != NULL);
 
-  agent_unlock ();
+  /* Only allocate a closure if the callback is being deferred to an idle
+   * handler. */
+  if (g_main_context_is_owner (component->ctx)) {
+    /* Thread owns the main context, so invoke the callback directly. */
+    agent_unlock ();
+    io_callback (agent, stream_id,
+        component_id, buf_len, (gchar *) buf, io_user_data);
+    agent_lock ();
+  } else {
+    IOCallbackData *data;
+
+    g_mutex_lock (&component->io_mutex);
+
+    /* Slow path: Current thread doesn’t own the Component’s context at the
+     * moment, so schedule the callback in an idle handler. */
+    data = io_callback_data_new (buf, buf_len);
+    g_queue_push_tail (&component->pending_io_messages,
+        data);  /* transfer ownership */
+
+    nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
+
+    component_schedule_io_callback (component);
+
+    g_mutex_unlock (&component->io_mutex);
+  }
+}
+
+/* Note: Must be called with the io_mutex held. */
+static void
+component_schedule_io_callback (Component *component)
+{
+  GSource *source;
 
-  io_callback (agent, stream_id, component_id,
-      buf_len, (gchar *) buf, io_user_data);
+  /* Already scheduled or nothing to schedule? */
+  if (component->io_callback_id != 0 ||
+      g_queue_is_empty (&component->pending_io_messages))
+    return;
+
+  /* Add the idle callback. If nice_agent_attach_recv() is called with a
+   * NULL callback before this source is dispatched, the source will be
+   * destroyed, but any pending data will remain in
+   * component->pending_io_messages, ready to be picked up when a callback
+   * is re-attached, or if nice_agent_recv() is called. */
+  source = g_idle_source_new ();
+  g_source_set_priority (source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (source, emit_io_callback_cb, component, NULL);
+  component->io_callback_id = g_source_attach (source, component->ctx);
+  g_source_unref (source);
+}
+
+/* Note: Must be called with the io_mutex held. */
+static void
+component_deschedule_io_callback (Component *component)
+{
+  /* Already descheduled? */
+  if (component->io_callback_id == 0)
+    return;
 
-  agent_lock ();
+  g_source_remove (component->io_callback_id);
+  component->io_callback_id = 0;
 }
index 19b2536..d59ad4b 100644 (file)
@@ -105,6 +105,31 @@ typedef struct {
   GSource *source;
 } SocketSource;
 
+
+/* A buffer of data which has been received and processed (so is guaranteed not
+ * to be a STUN packet, or to contain pseudo-TCP header bytes, for example), but
+ * which hasn’t yet been sent to the client in an I/O callback. This could be
+ * due to the main context not being run, or due to the I/O callback being
+ * detached.
+ *
+ * The @offset member gives the byte offset into @buf which has already been
+ * sent to the client. #IOCallbackData buffers remain in the
+ * #Component::pending_io_messages queue until all of their bytes have been sent
+ * to the client.
+ *
+ * @offset is guaranteed to be smaller than @buf_len. */
+typedef struct {
+  guint8 *buf;  /* owned */
+  gsize buf_len;
+  gsize offset;
+} IOCallbackData;
+
+IOCallbackData *
+io_callback_data_new (const guint8 *buf, gsize buf_len);
+void
+io_callback_data_free (IOCallbackData *data);
+
+
 struct _Component
 {
   NiceComponentType type;
@@ -119,13 +144,27 @@ struct _Component
                                    see ICE 11.1. "Sending Media" (ID-19) */
   NiceCandidate *restart_candidate; /**< for storing active remote candidate during a restart */
 
+  GMutex io_mutex;                  /**< protects io_callback, io_user_data,
+                                         pending_io_messages and io_callback_id.
+                                         immutable: can be accessed without
+                                         holding the agent lock; if the agent
+                                         lock is to be taken, it must always be
+                                         taken before this one */
   NiceAgentRecvFunc io_callback;    /**< function called on io cb */
   gpointer io_user_data;            /**< data passed to the io function */
   GMainContext *ctx;                /**< context for GSources for this
                                        component */
-
-  NiceAgent *agent;  /* unowned */
-  Stream *stream;  /* unowned */
+  GQueue pending_io_messages;       /**< queue of packets which have been
+                                         received but not passed to the client
+                                         in an I/O callback or recv() call yet.
+                                         each element is an owned
+                                         IOCallbackData */
+  guint io_callback_id;             /* GSource ID of the I/O callback */
+
+  NiceAgent *agent;  /* unowned, immutable: can be accessed without holding the
+                      * agent lock */
+  Stream *stream;  /* unowned, immutable: can be accessed without holding the
+                    * agent lock */
 
   PseudoTcpSocket *tcp;
   GSource* tcp_clock;
@@ -177,6 +216,9 @@ void
 component_emit_io_callback (Component *component,
     const guint8 *buf, gsize buf_len);
 
+gboolean
+component_has_io_callback (Component *component);
+
 G_END_DECLS
 
 #endif /* _NICE_COMPONENT_H */