Implement zero-copy and make the buffer size configurable.
authorWouter Cloetens <wouter@mind.be>
Fri, 22 Feb 2008 07:20:03 +0000 (07:20 +0000)
committerSebastian Dröge <slomo@circular-chaos.org>
Fri, 22 Feb 2008 07:20:03 +0000 (07:20 +0000)
Original commit message from CVS:
Patch by: Wouter Cloetens <wouter at mind dot be>
* configure.ac:
* ext/soup/gstsouphttpsrc.c: (gst_soup_http_src_cancel_message),
(gst_soup_http_src_finished_cb), (gst_soup_http_src_chunk_free),
(gst_soup_http_src_chunk_allocator),
(gst_soup_http_src_got_chunk_cb), (gst_soup_http_src_create),
(gst_soup_http_src_start), (gst_soup_http_src_set_proxy):
* ext/soup/gstsouphttpsrc.h:
Implement zero-copy and make the buffer size configurable.
Prefix proxy URIs with "http://" if they don't start with it
already and catch errors earlier, fixes hanging in some situations.
Fixes bug #514948.

common
ext/soup/gstsouphttpsrc.c
ext/soup/gstsouphttpsrc.h

diff --git a/common b/common
index 2a19465..135628f 160000 (submodule)
--- a/common
+++ b/common
@@ -1 +1 @@
-Subproject commit 2a19465fdb43a75f4d32950fd2beb1beb950eec2
+Subproject commit 135628f16d422584d3454fb9c9805e7be25760a1
index 86e6aca..af50eaa 100644 (file)
@@ -124,7 +124,7 @@ enum
   PROP_IRADIO_TITLE
 };
 
-#define DEFAULT_USER_AGENT           "GStreamer souphttpsrc"
+#define DEFAULT_USER_AGENT           "GStreamer souphttpsrc "
 
 static void gst_soup_http_src_uri_handler_init (gpointer g_iface,
     gpointer iface_data);
@@ -163,6 +163,9 @@ static void gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src);
 static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src);
 static void gst_soup_http_src_parse_status (SoupMessage * msg,
     GstSoupHTTPSrc * src);
+static void gst_soup_http_src_chunk_free (gpointer gstbuf);
+static SoupBuffer *gst_soup_http_src_chunk_allocator (SoupMessage * msg,
+    gsize max_len, gpointer user_data);
 static void gst_soup_http_src_got_chunk_cb (SoupMessage * msg,
     SoupBuffer * chunk, GstSoupHTTPSrc * src);
 static void gst_soup_http_src_response_cb (SoupSession * session,
@@ -459,7 +462,8 @@ gst_soup_http_src_unicodify (const gchar * str)
 static void
 gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
 {
-  soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
+  if (src->msg != NULL)
+    soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
   src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
   src->msg = NULL;
 }
@@ -645,25 +649,88 @@ gst_soup_http_src_finished_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
     GST_DEBUG_OBJECT (src, "finished, but not for current message");
     return;
   }
+  GST_DEBUG_OBJECT (src, "finished");
   if (G_UNLIKELY (src->session_io_status !=
           GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
-    /* Probably a redirect. */
-    return;
+    GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND,
+        ("%s", msg->reason_phrase),
+        ("libsoup status code %d", msg->status_code));
   }
-  GST_DEBUG_OBJECT (src, "finished");
   src->ret = GST_FLOW_UNEXPECTED;
   if (src->loop)
     g_main_loop_quit (src->loop);
 }
 
+/* Buffer lifecycle management.
+ *
+ * gst_soup_http_src_create() runs the GMainLoop for this element, to let
+ * Soup take control.
+ * A GstBuffer is allocated in gst_soup_http_src_chunk_allocator() and
+ * associated with a SoupBuffer.
+ * Soup reads HTTP data in the GstBuffer's data buffer.
+ * The gst_soup_http_src_got_chunk_cb() is then called with the SoupBuffer.
+ * That sets gst_soup_http_src_create()'s return argument to the GstBuffer,
+ * increments its refcount (to 2), pauses the flow of data from the HTTP
+ * source to prevent gst_soup_http_src_got_chunk_cb() from being called
+ * again and breaks out of the GMainLoop.
+ * Because the SOUP_MESSAGE_OVERWRITE_CHUNKS flag is set, Soup frees the
+ * SoupBuffer and calls gst_soup_http_src_chunk_free(), which decrements the
+ * refcount (to 1).
+ * gst_soup_http_src_create() returns the GstBuffer. It will be freed by a
+ * downstream element.
+ * If Soup fails to read HTTP data, it does not call
+ * gst_soup_http_src_got_chunk_cb(), but still frees the SoupBuffer and
+ * calls gst_soup_http_src_chunk_free(), which decrements the GstBuffer's
+ * refcount to 0, freeing it.
+ */
+
+static void
+gst_soup_http_src_chunk_free (gpointer gstbuf)
+{
+  gst_buffer_unref (GST_BUFFER_CAST (gstbuf));
+}
+
+static SoupBuffer *
+gst_soup_http_src_chunk_allocator (SoupMessage * msg, gsize max_len,
+    gpointer user_data)
+{
+  GstSoupHTTPSrc *src = (GstSoupHTTPSrc *) user_data;
+  GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
+  GstBuffer *gstbuf;
+  SoupBuffer *soupbuf;
+  gsize length;
+  GstFlowReturn rc;
+
+  if (max_len)
+    length = MIN (basesrc->blocksize, max_len);
+  else
+    length = basesrc->blocksize;
+  GST_DEBUG_OBJECT (src, "alloc %" G_GSIZE_FORMAT " bytes <= %" G_GSIZE_FORMAT,
+      length, max_len);
+
+  rc = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (basesrc),
+      GST_BUFFER_OFFSET_NONE, length,
+      GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)), &gstbuf);
+  if (G_UNLIKELY (rc != GST_FLOW_OK)) {
+    /* Failed to allocate buffer. Stall SoupSession and return error code
+     * to create(). */
+    src->ret = rc;
+    g_main_loop_quit (src->loop);
+    return NULL;
+  }
+
+  soupbuf = soup_buffer_new_with_owner (GST_BUFFER_DATA (gstbuf), length,
+      gstbuf, gst_soup_http_src_chunk_free);
+
+  return soupbuf;
+}
+
 static void
 gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
     GstSoupHTTPSrc * src)
 {
   GstBaseSrc *basesrc;
   guint64 new_position;
-  const char *data;
-  gsize length;
 
   if (G_UNLIKELY (msg != src->msg)) {
     GST_DEBUG_OBJECT (src, "got chunk, but not for current message");
@@ -675,22 +742,22 @@ gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
     return;
   }
   basesrc = GST_BASE_SRC_CAST (src);
-  data = chunk->data;
-  length = chunk->length;
-  GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes", length);
-
-  /* Create the buffer. */
-  src->ret = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (basesrc),
-      basesrc->segment.last_stop, length,
-      GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)), src->outbuf);
-  if (G_LIKELY (src->ret == GST_FLOW_OK)) {
-    memcpy (GST_BUFFER_DATA (*src->outbuf), data, length);
-    new_position = src->read_position + length;
-    if (G_LIKELY (src->request_position == src->read_position))
-      src->request_position = new_position;
-    src->read_position = new_position;
-  }
-
+  GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes",
+      chunk->length);
+
+  /* Extract the GstBuffer from the SoupBuffer and set its fields. */
+  *src->outbuf = GST_BUFFER_CAST (soup_buffer_get_owner (chunk));
+  gst_buffer_ref (*src->outbuf);
+  GST_BUFFER_SIZE (*src->outbuf) = chunk->length;
+  GST_BUFFER_OFFSET (*src->outbuf) = basesrc->segment.last_stop;
+  gst_buffer_set_caps (*src->outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)));
+
+  new_position = src->read_position + chunk->length;
+  if (G_LIKELY (src->request_position == src->read_position))
+    src->request_position = new_position;
+  src->read_position = new_position;
+
+  src->ret = GST_FLOW_OK;
   g_main_loop_quit (src->loop);
   gst_soup_http_src_session_pause_message (src);
 }
@@ -789,10 +856,6 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
     src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
     soup_message_headers_append (src->msg->request_headers, "Connection",
         "close");
-    if (src->user_agent) {
-      soup_message_headers_append (src->msg->request_headers, "User-Agent",
-          src->user_agent);
-    }
     if (src->iradio_mode) {
       soup_message_headers_append (src->msg->request_headers, "icy-metadata",
           "1");
@@ -808,6 +871,8 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
         G_CALLBACK (gst_soup_http_src_got_chunk_cb), src);
     soup_message_set_flags (src->msg, SOUP_MESSAGE_OVERWRITE_CHUNKS |
         (src->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
+    soup_message_set_chunk_allocator (src->msg,
+        gst_soup_http_src_chunk_allocator, src, NULL);
     gst_soup_http_src_add_range_header (src, src->request_position);
   }
 
@@ -828,10 +893,6 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
         GST_DEBUG_OBJECT (src, "Queueing connection request");
         gst_soup_http_src_queue_message (src);
         break;
-      case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_FINISHED:
-        GST_DEBUG_OBJECT (src, "Connection closed");
-        gst_soup_http_src_cancel_message (src);
-        break;
       case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED:
         break;
       case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING:
@@ -874,11 +935,12 @@ gst_soup_http_src_start (GstBaseSrc * bsrc)
   if (src->proxy == NULL)
     src->session =
         soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
-        src->context, NULL);
+        src->context, SOUP_SESSION_USER_AGENT, src->user_agent, NULL);
   else
     src->session =
         soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
-        src->context, SOUP_SESSION_PROXY_URI, src->proxy, NULL);
+        src->context, SOUP_SESSION_PROXY_URI, src->proxy,
+        SOUP_SESSION_USER_AGENT, src->user_agent, NULL);
   if (!src->session) {
     GST_ELEMENT_ERROR (src, LIBRARY, INIT,
         (NULL), ("Failed to create async session"));
@@ -996,7 +1058,14 @@ gst_soup_http_src_set_proxy (GstSoupHTTPSrc * src, const gchar * uri)
     soup_uri_free (src->proxy);
     src->proxy = NULL;
   }
-  src->proxy = soup_uri_new (uri);
+  if (g_str_has_prefix (uri, "http://")) {
+    src->proxy = soup_uri_new (uri);
+  } else {
+    gchar *new_uri = g_strconcat ("http://", uri, NULL);
+
+    src->proxy = soup_uri_new (new_uri);
+    g_free (new_uri);
+  }
 
   return TRUE;
 }
index 14891d9..9e7d81c 100644 (file)
@@ -42,7 +42,6 @@ typedef enum {
   GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE,
   GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED,
   GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING,
-  GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_FINISHED,
 } GstSoupHTTPSrcSessionIOStatus;
 
 struct _GstSoupHTTPSrc {