souphttpsrc: reduce reading latency by using non-blocking read
authorThiago Santos <thiagoss@osg.samsung.com>
Sat, 14 May 2016 14:09:33 +0000 (11:09 -0300)
committerThiago Santos <thiagoss@osg.samsung.com>
Sun, 15 May 2016 09:36:25 +0000 (06:36 -0300)
Non-blocking read will return the amount of data available without
blocking to wait for the full requested size.

The downside is that now it souphttpsrc needs to have a waiting
mechanism in case there is no data available yet to avoid busy
looping arond the inputstream.

ext/soup/gstsouphttpsrc.c
ext/soup/gstsouphttpsrc.h

index b707d08..75707cb 100644 (file)
@@ -77,6 +77,7 @@
 #endif
 #include <gst/gstelement.h>
 #include <gst/gst-i18n-plugin.h>
+#include <gio/gio.h>
 #include <libsoup/soup.h>
 #include "gstsouphttpsrc.h"
 #include "gstsouputils.h"
@@ -175,6 +176,7 @@ static void gst_soup_http_src_got_headers (GstSoupHTTPSrc * src,
 static void gst_soup_http_src_authenticate_cb (SoupSession * session,
     SoupMessage * msg, SoupAuth * auth, gboolean retrying,
     GstSoupHTTPSrc * src);
+static void gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src);
 
 #define gst_soup_http_src_parent_class parent_class
 G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC,
@@ -439,10 +441,7 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src)
 
   src->ret = GST_FLOW_OK;
   g_cancellable_reset (src->cancellable);
-  if (src->input_stream) {
-    g_object_unref (src->input_stream);
-    src->input_stream = NULL;
-  }
+  gst_soup_http_src_destroy_input_stream (src);
 
   gst_caps_replace (&src->src_caps, NULL);
   g_free (src->iradio_name);
@@ -461,6 +460,7 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
   g_mutex_init (&src->mutex);
   g_cond_init (&src->have_headers_cond);
   src->cancellable = g_cancellable_new ();
+  src->poll_context = g_main_context_new ();
   src->location = NULL;
   src->redirection_uri = NULL;
   src->automatic_redirect = TRUE;
@@ -515,6 +515,7 @@ gst_soup_http_src_finalize (GObject * gobject)
   g_mutex_clear (&src->mutex);
   g_cond_clear (&src->have_headers_cond);
   g_object_unref (src->cancellable);
+  g_main_context_unref (src->poll_context);
   g_free (src->location);
   g_free (src->redirection_uri);
   g_free (src->user_agent);
@@ -765,6 +766,21 @@ gst_soup_http_src_unicodify (const gchar * str)
 }
 
 static void
+gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src)
+{
+  if (src->input_stream) {
+    if (src->poll_source) {
+      g_source_destroy (src->poll_source);
+      g_source_unref (src->poll_source);
+      src->poll_source = NULL;
+    }
+    g_input_stream_close (src->input_stream, src->cancellable, NULL);
+    g_object_unref (src->input_stream);
+    src->input_stream = NULL;
+  }
+}
+
+static void
 gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
 {
   g_cancellable_cancel (src->cancellable);
@@ -1355,11 +1371,25 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
   return TRUE;
 }
 
+static void
+gst_soup_http_src_check_input_stream_interfaces (GstSoupHTTPSrc * src)
+{
+  if (!src->input_stream)
+    return;
+
+  src->has_pollable_interface = G_IS_POLLABLE_INPUT_STREAM (src->input_stream)
+      && g_pollable_input_stream_can_poll ((GPollableInputStream *)
+      src->input_stream);
+}
+
 static GstFlowReturn
 gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
 {
   g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
 
+  g_assert (src->input_stream == NULL);
+  g_assert (src->poll_source == NULL);
+
   /* FIXME We are ignoring the GError here, might be useful to debug */
   src->input_stream =
       soup_session_send (src->session, src->msg, src->cancellable, NULL);
@@ -1380,6 +1410,8 @@ gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
     return GST_FLOW_ERROR;
   }
 
+  gst_soup_http_src_check_input_stream_interfaces (src);
+
   return GST_FLOW_OK;
 }
 
@@ -1448,6 +1480,38 @@ gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
   }
 }
 
+static gboolean
+_gst_soup_http_src_data_available_callback (GObject * pollable_stream,
+    gpointer udata)
+{
+  GstSoupHTTPSrc *src = udata;
+
+  src->have_data = TRUE;
+  return TRUE;
+}
+
+/* Need to wait on a gsource to know when data is available */
+static gboolean
+gst_soup_http_src_wait_for_data (GstSoupHTTPSrc * src)
+{
+  src->have_data = FALSE;
+
+  if (!src->poll_source) {
+    src->poll_source =
+        g_pollable_input_stream_create_source ((GPollableInputStream *)
+        src->input_stream, src->cancellable);
+    g_source_set_callback (src->poll_source,
+        (GSourceFunc) _gst_soup_http_src_data_available_callback, src, NULL);
+    g_source_attach (src->poll_source, src->poll_context);
+  }
+
+  while (!src->have_data && !g_cancellable_is_cancelled (src->cancellable)) {
+    g_main_context_iteration (src->poll_context, TRUE);
+  }
+
+  return src->have_data;
+}
+
 static GstFlowReturn
 gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
 {
@@ -1455,6 +1519,7 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
   GstMapInfo mapinfo;
   GstBaseSrc *bsrc;
   GstFlowReturn ret;
+  GError *err = NULL;
 
   bsrc = GST_BASE_SRC_CAST (src);
 
@@ -1469,9 +1534,34 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
     return GST_FLOW_ERROR;
   }
 
-  read_bytes =
-      g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
-      src->cancellable, NULL);
+  if (src->has_pollable_interface) {
+    while (1) {
+      read_bytes =
+          g_pollable_input_stream_read_nonblocking ((GPollableInputStream *)
+          src->input_stream, mapinfo.data, mapinfo.size, src->cancellable,
+          &err);
+      if (read_bytes == -1) {
+        if (err && g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+          g_error_free (err);
+          err = NULL;
+
+          /* no data yet, wait */
+          if (gst_soup_http_src_wait_for_data (src))
+            /* retry */
+            continue;
+        }
+      }
+      break;
+    }
+  } else {
+    read_bytes =
+        g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
+        src->cancellable, NULL);
+  }
+
+  if (err)
+    g_error_free (err);
+
   GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
       read_bytes);
 
@@ -1518,11 +1608,7 @@ retry:
 
   /* Check for pending position change */
   if (src->request_position != src->read_position) {
-    if (src->input_stream) {
-      g_input_stream_close (src->input_stream, src->cancellable, NULL);
-      g_object_unref (src->input_stream);
-      src->input_stream = NULL;
-    }
+    gst_soup_http_src_destroy_input_stream (src);
   }
 
   if (g_cancellable_is_cancelled (src->cancellable)) {
@@ -1559,10 +1645,7 @@ done:
       gst_event_unref (http_headers_event);
 
     g_mutex_lock (&src->mutex);
-    if (src->input_stream) {
-      g_object_unref (src->input_stream);
-      src->input_stream = NULL;
-    }
+    gst_soup_http_src_destroy_input_stream (src);
     g_mutex_unlock (&src->mutex);
     if (ret == GST_FLOW_CUSTOM_ERROR)
       goto retry;
index 2d5b3ee..a5e259b 100644 (file)
@@ -89,6 +89,10 @@ struct _GstSoupHTTPSrc {
 
   GCancellable *cancellable;
   GInputStream *input_stream;
+  gboolean has_pollable_interface;
+  gboolean have_data;
+  GMainContext *poll_context;
+  GSource *poll_source;
 
   /* Shoutcast/icecast metadata extraction handling. */
   gboolean iradio_mode;