agent: Add GPollableOutputStream support to NiceOutputStream
authorPhilip Withnall <philip.withnall@collabora.co.uk>
Tue, 17 Dec 2013 10:30:19 +0000 (10:30 +0000)
committerOlivier Crête <olivier.crete@collabora.com>
Fri, 31 Jan 2014 06:48:59 +0000 (01:48 -0500)
agent/agent.h
agent/outputstream.c

index 8b4c974..649e715 100644 (file)
@@ -1131,6 +1131,8 @@ nice_agent_parse_remote_candidate_sdp (
  *
  * Build a #GIOStream wrapper around the given stream and component in
  * @agent. The I/O stream will be valid for as long as @stream_id is valid.
+ * The #GInputStream and #GOutputStream implement #GPollableInputStream and
+ * #GPollableOutputStream.
  *
  * This function may only be called on reliable #NiceAgents. It is an error to
  * try and create an I/O stream wrapper for an unreliable stream.
index 85b29e2..839c36c 100644 (file)
 # include "config.h"
 #endif
 
+#include <errno.h>
+
 #include "outputstream.h"
+#include "agent-priv.h"
 
+static void nice_output_stream_init_pollable (
+    GPollableOutputStreamInterface *iface);
 static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
     gpointer user_data);
 
-G_DEFINE_TYPE (NiceOutputStream,
-              nice_output_stream, G_TYPE_OUTPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (NiceOutputStream,
+                         nice_output_stream, G_TYPE_OUTPUT_STREAM,
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+                                                nice_output_stream_init_pollable));
 
 enum
 {
@@ -96,7 +103,12 @@ static void nice_output_stream_set_property (GObject *object, guint prop_id,
 static gssize nice_output_stream_write (GOutputStream *stream,
     const void *buffer, gsize count, GCancellable *cancellable, GError **error);
 
-
+static gboolean nice_output_stream_is_writable (GPollableOutputStream *stream);
+static gssize nice_output_stream_write_nonblocking (
+    GPollableOutputStream *stream, const void *buffer, gsize count,
+    GError **error);
+static GSource *nice_output_stream_create_source (GPollableOutputStream *stream,
+    GCancellable *cancellable);
 
 /* Output Stream */
 static void
@@ -248,6 +260,14 @@ nice_output_stream_init (NiceOutputStream *stream)
   g_weak_ref_init (&stream->priv->agent_ref, NULL);
 }
 
+static void
+nice_output_stream_init_pollable (GPollableOutputStreamInterface *iface)
+{
+  iface->is_writable = nice_output_stream_is_writable;
+  iface->write_nonblocking = nice_output_stream_write_nonblocking;
+  iface->create_source = nice_output_stream_create_source;
+}
+
 /**
  * nice_output_stream_new:
  * @agent: A #NiceAgent
@@ -401,6 +421,148 @@ done:
   return len;
 }
 
+static gboolean
+nice_output_stream_is_writable (GPollableOutputStream *stream)
+{
+  NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
+  Component *component = NULL;
+  Stream *_stream = NULL;
+  gboolean retval = FALSE;
+  GSList *i;
+  NiceAgent *agent;  /* owned */
+
+  /* Closed streams are not writeable. */
+  if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
+    return FALSE;
+
+  /* Has the agent disappeared? */
+  agent = g_weak_ref_get (&priv->agent_ref);
+  if (agent == NULL)
+    return FALSE;
+
+  agent_lock ();
+
+  if (!agent_find_component (agent, priv->stream_id, priv->component_id,
+          &_stream, &component)) {
+    g_warning ("Could not find component %u in stream %u", priv->component_id,
+        priv->stream_id);
+    goto done;
+  }
+
+  /* If it’s a reliable agent, see if there’s any space in the pseudo-TCP output
+   * buffer. */
+  if (agent->reliable && component->tcp != NULL &&
+      pseudo_tcp_socket_can_send (component->tcp)) {
+    retval = TRUE;
+    goto done;
+  }
+
+  /* Check whether any of the component’s FDs are pollable. */
+  for (i = component->socket_sources; i != NULL; i = i->next) {
+    SocketSource *socket_source = i->data;
+    NiceSocket *socket = socket_source->socket;
+
+    if (g_socket_condition_check (socket->fileno, G_IO_OUT) != 0) {
+      retval = TRUE;
+      break;
+    }
+  }
+
+done:
+  agent_unlock ();
+
+  g_object_unref (agent);
+
+  return retval;
+}
+
+static gssize
+nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
+    const void *buffer, gsize count, GError **error)
+{
+  NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
+  NiceAgent *agent;  /* owned */
+  gssize len;
+
+  /* Closed streams are not writeable. */
+  if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) {
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+        "Stream is closed.");
+    return -1;
+  }
+
+  /* Has the agent disappeared? */
+  agent = g_weak_ref_get (&priv->agent_ref);
+  if (agent == NULL) {
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+        "Stream is closed due to the NiceAgent being finalised.");
+    return -1;
+  }
+
+  if (count == 0)
+    return 0;
+
+  /* This is equivalent to the default GPollableOutputStream implementation. */
+  if (!g_pollable_output_stream_is_writable (stream)) {
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+        g_strerror (EAGAIN));
+    return -1;
+  }
+
+  len = nice_agent_send_full (agent, priv->stream_id, priv->component_id,
+      buffer, count, NULL, error);
+
+  g_object_unref (agent);
+
+  return len;
+}
+
+static GSource *
+nice_output_stream_create_source (GPollableOutputStream *stream,
+    GCancellable *cancellable)
+{
+  NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
+  GSource *component_source = NULL;
+  Component *component = NULL;
+  Stream *_stream = NULL;
+  NiceAgent *agent;  /* owned */
+
+  /* Closed streams cannot have sources. */
+  if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
+    return g_pollable_source_new (G_OBJECT (stream));  /* dummy */
+
+  /* Has the agent disappeared? */
+  agent = g_weak_ref_get (&priv->agent_ref);
+  if (agent == NULL)
+    return g_pollable_source_new (G_OBJECT (stream));  /* dummy */
+
+  agent_lock ();
+
+  /* Grab the socket for this component. */
+  if (!agent_find_component (agent, priv->stream_id, priv->component_id,
+          &_stream, &component)) {
+    g_warning ("Could not find component %u in stream %u", priv->component_id,
+        priv->stream_id);
+    component_source = g_pollable_source_new (G_OBJECT (stream));  /* dummy */
+    goto done;
+  }
+
+  /* Note: We need G_IO_IN here to handle pseudo-TCP streams. If our TCP
+   * transmit buffer is full, but the kernel's receive buffer has pending ACKs
+   * sitting in it, we need to receive those ACKs so we can transmit the head
+   * bytes in the transmit buffer, and hence free up space in the tail of the
+   * buffer so the stream is writeable again. */
+  component_source = component_source_new (component, G_OBJECT (stream),
+      G_IO_IN | G_IO_OUT, cancellable);
+
+done:
+  agent_unlock ();
+
+  g_object_unref (agent);
+
+  return component_source;
+}
+
 static void
 streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
 {