soup: move libsoup session into its own thread
authorDaniel Kolesa <dkolesa@igalia.com>
Fri, 21 Jan 2022 15:09:30 +0000 (16:09 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 28 Jan 2022 08:49:09 +0000 (08:49 +0000)
Starting with libsoup3, there is no attempt to handle thread safety
inside the library, and it was never considered fully safe before
either. Therefore, move all session handling into its own thread.

The libsoup thread has its own context and main loop. When some
request is made or a response needs to be read, an idle source
is created to issue that; the gstreamer thread issuing that waits
for that to be complete. There is a per-src condition variable to
deal with that.

Since the thread/loop needs to be longer-lived than the soup
session itself, a wrapper object is provided to contain them. The
soup session only has a single reference, owned by the wrapper
object.

It is no longer possible to force an external session, since this
does not seem to be used anywhere within gstreamer and would be
tricky to implement; this is because one would not have to provide
just a session, but also the complete thread arrangement made in
the same way as the system currently does internally, in order to
be safe.

Messages are still built gstreamer-side. It is safe to do so until
the message is sent on the session. Headers are also processed on
the gstreamer side, which should likewise be safe.

All requests as well as reads on the libsoup thread are issued
asynchronously. That allows libsoup to schedule things with as
little blocking as possible, and means that concurrent access
to the session is possible, when sharing the session.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/947

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1555>

subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c
subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h
subprojects/gst-plugins-good/ext/soup/gstsouploader.c
subprojects/gst-plugins-good/ext/soup/gstsouploader.h

index 8496e8343f1407be5e043944150cb21e44497091..ec44d48bffbe38abfe4f04d88beaa12617e737fb 100644 (file)
 
 #include <gst/tag/tag.h>
 
+/* this is a simple wrapper class around SoupSession; it exists in order to
+ * have a refcountable owner for the actual SoupSession + the thread it runs
+ * in and its main loop (we cannot inverse the ownership hierarchy, because
+ * the thread + loop are actually longer lived than the session)
+ *
+ * it is entirely private to this implementation
+ */
+
+#define GST_TYPE_SOUP_SESSION (gst_soup_session_get_type())
+#define GST_SOUP_SESSION(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SOUP_SESSION, GstSoupSession))
+
+GType gst_soup_session_get_type (void);
+
+typedef struct _GstSoupSessionClass GstSoupSessionClass;
+
+struct _GstSoupSession
+{
+  GObject parent_instance;
+
+  SoupSession *session;
+  GThread *thread;
+  GMainLoop *loop;
+};
+
+struct _GstSoupSessionClass
+{
+  GObjectClass parent_class;
+};
+
+G_DEFINE_TYPE (GstSoupSession, gst_soup_session, G_TYPE_OBJECT);
+
+static void
+gst_soup_session_init (GstSoupSession * sess)
+{
+}
+
+static gboolean
+_soup_session_finalize_cb (gpointer user_data)
+{
+  GstSoupSession *sess = user_data;
+
+  g_clear_object (&sess->session);
+  g_main_loop_quit (sess->loop);
+
+  return FALSE;
+}
+
+static void
+gst_soup_session_finalize (GObject * obj)
+{
+  GstSoupSession *sess = GST_SOUP_SESSION (obj);
+  GSource *src;
+
+  /* handle disposing of failure cases */
+  if (!sess->loop)
+    return;
+
+  src = g_idle_source_new ();
+
+  g_source_set_callback (src, _soup_session_finalize_cb, sess, NULL);
+  g_source_attach (src, g_main_loop_get_context (sess->loop));
+  g_source_unref (src);
+
+  /* finish off thread and the loop; ensure it's not from the thread */
+  g_assert (!g_main_context_is_owner (g_main_loop_get_context (sess->loop)));
+  g_thread_join (sess->thread);
+  g_main_loop_unref (sess->loop);
+}
+
+static void
+gst_soup_session_class_init (GstSoupSessionClass * klass)
+{
+  GObjectClass *gclass = G_OBJECT_CLASS (klass);
+
+  gclass->finalize = gst_soup_session_finalize;
+}
+
 GST_DEBUG_CATEGORY_STATIC (souphttpsrc_debug);
 #define GST_CAT_DEFAULT souphttpsrc_debug
 
@@ -176,7 +253,6 @@ static gboolean gst_soup_http_src_set_proxy (GstSoupHTTPSrc * src,
 static char *gst_soup_http_src_unicodify (const char *str);
 static gboolean gst_soup_http_src_build_message (GstSoupHTTPSrc * src,
     const gchar * method);
-static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src);
 static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src,
     guint64 offset, guint64 stop_offset);
 static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src);
@@ -455,6 +531,7 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src)
   src->retry_count = 0;
   src->have_size = FALSE;
   src->got_headers = FALSE;
+  src->headers_ret = GST_FLOW_OK;
   src->seekable = FALSE;
   src->read_position = 0;
   src->request_position = 0;
@@ -467,12 +544,6 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src)
   src->last_socket_read_time = 0;
 
   g_cancellable_reset (src->cancellable);
-  g_mutex_lock (&src->mutex);
-  if (src->input_stream) {
-    g_object_unref (src->input_stream);
-    src->input_stream = NULL;
-  }
-  g_mutex_unlock (&src->mutex);
 
   gst_caps_replace (&src->src_caps, NULL);
   g_free (src->iradio_name);
@@ -488,8 +559,8 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
 {
   const gchar *proxy;
 
-  g_mutex_init (&src->mutex);
-  g_cond_init (&src->have_headers_cond);
+  g_mutex_init (&src->session_mutex);
+  g_cond_init (&src->session_cond);
   src->cancellable = g_cancellable_new ();
   src->location = NULL;
   src->redirection_uri = NULL;
@@ -503,7 +574,6 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
   src->iradio_mode = DEFAULT_IRADIO_MODE;
   src->session = NULL;
   src->external_session = NULL;
-  src->forced_external_session = FALSE;
   src->msg = NULL;
   src->timeout = DEFAULT_TIMEOUT;
   src->log_level = DEFAULT_SOUP_LOG_LEVEL;
@@ -537,10 +607,7 @@ gst_soup_http_src_dispose (GObject * gobject)
 
   gst_soup_http_src_session_close (src);
 
-  if (src->external_session) {
-    g_object_unref (src->external_session);
-    src->external_session = NULL;
-  }
+  g_clear_object (&src->external_session);
 
   G_OBJECT_CLASS (parent_class)->dispose (gobject);
 }
@@ -552,8 +619,8 @@ gst_soup_http_src_finalize (GObject * gobject)
 
   GST_DEBUG_OBJECT (src, "finalize");
 
-  g_mutex_clear (&src->mutex);
-  g_cond_clear (&src->have_headers_cond);
+  g_mutex_clear (&src->session_mutex);
+  g_cond_clear (&src->session_cond);
   g_object_unref (src->cancellable);
   g_free (src->location);
   g_free (src->redirection_uri);
@@ -809,13 +876,6 @@ gst_soup_http_src_unicodify (const gchar * str)
   return gst_tag_freeform_string_to_utf8 (str, -1, env_vars);
 }
 
-static void
-gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
-{
-  g_cancellable_cancel (src->cancellable);
-  g_cond_signal (&src->have_headers_cond);
-}
-
 static gboolean
 gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src, guint64 offset,
     guint64 stop_offset)
@@ -921,10 +981,136 @@ gst_soup_http_src_add_extra_headers (GstSoupHTTPSrc * src)
   return gst_structure_foreach (src->extra_headers, _append_extra_headers, src);
 }
 
+static gpointer
+thread_func (gpointer user_data)
+{
+  GstSoupHTTPSrc *src = user_data;
+  GMainContext *ctx;
+
+  GST_DEBUG_OBJECT (src, "thread start");
+
+  ctx = g_main_loop_get_context (src->session->loop);
+
+  g_main_context_push_thread_default (ctx);
+
+  /* We explicitly set User-Agent to NULL here and overwrite it per message
+   * to be able to have the same session with different User-Agents per
+   * source */
+  src->session->session =
+      _soup_session_new_with_options ("user-agent", NULL,
+      "timeout", src->timeout, "tls-interaction", src->tls_interaction,
+      /* Unset the limit the number of maximum allowed connections */
+      "max-conns", src->session_is_shared ? G_MAXINT : 10,
+      "max-conns-per-host", src->session_is_shared ? G_MAXINT : 2, NULL);
+
+  if (!src->session->session) {
+    return NULL;
+  }
+
+  if (gst_soup_loader_get_api_version () == 3) {
+    if (src->proxy != NULL) {
+      GProxyResolver *proxy_resolver;
+      char *proxy_string = gst_soup_uri_to_string (src->proxy);
+      proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL);
+      g_free (proxy_string);
+      g_object_set (src->session->session, "proxy-resolver", proxy_resolver,
+          NULL);
+      g_object_unref (proxy_resolver);
+    }
+  } else {
+    g_object_set (src->session->session, "ssl-strict", src->ssl_strict, NULL);
+    if (src->proxy != NULL) {
+      g_object_set (src->session->session, "proxy-uri", src->proxy->soup_uri,
+          NULL);
+    }
+  }
+
+  gst_soup_util_log_setup (src->session->session, src->log_level,
+      GST_ELEMENT (src));
+  if (gst_soup_loader_get_api_version () < 3) {
+    _soup_session_add_feature_by_type (src->session->session,
+        _soup_content_decoder_get_type ());
+  }
+  _soup_session_add_feature_by_type (src->session->session,
+      _soup_cookie_jar_get_type ());
+
+  if (src->session_is_shared) {
+    GstContext *context;
+    GstMessage *message;
+    GstStructure *s;
+
+    GST_DEBUG_OBJECT (src, "Sharing session %p", src->session->session);
+
+    context = gst_context_new (GST_SOUP_SESSION_CONTEXT, TRUE);
+    s = gst_context_writable_structure (context);
+    gst_structure_set (s, "session", GST_TYPE_SOUP_SESSION, src->session, NULL);
+
+    /* during this time the src is locked by the parent thread,
+     * which is waiting, so this is safe to do
+     */
+    GST_OBJECT_UNLOCK (src);
+    gst_element_set_context (GST_ELEMENT_CAST (src), context);
+    message = gst_message_new_have_context (GST_OBJECT_CAST (src), context);
+    gst_element_post_message (GST_ELEMENT_CAST (src), message);
+    GST_OBJECT_LOCK (src);
+  } else {
+    src->session_is_shared = FALSE;
+  }
+
+  /* soup2: connect the authenticate handler for the src that spawned the
+   * session (i.e. the first owner); other users of this session will connect
+   * their own after fetching the external session; the callback will handle
+   * this correctly (it checks if the message belongs to the current src
+   * and exits early if it does not)
+   */
+  if (gst_soup_loader_get_api_version () < 3) {
+    g_signal_connect (src->session->session, "authenticate",
+        G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src);
+  }
+
+  if (!src->session_is_shared) {
+    if (src->tls_database)
+      g_object_set (src->session->session, "tls-database", src->tls_database,
+          NULL);
+    else if (gst_soup_loader_get_api_version () == 2) {
+      if (src->ssl_ca_file)
+        g_object_set (src->session->session, "ssl-ca-file", src->ssl_ca_file,
+            NULL);
+      else
+        g_object_set (src->session->session, "ssl-use-system-ca-file",
+            src->ssl_use_system_ca_file, NULL);
+    }
+  }
+
+  g_main_loop_run (src->session->loop);
+
+  g_main_context_pop_thread_default (ctx);
+
+  GST_DEBUG_OBJECT (src, "thread stop");
+
+  return NULL;
+}
+
+static gboolean
+_session_ready_cb (gpointer user_data)
+{
+  GstSoupHTTPSrc *src = user_data;
+
+  GST_DEBUG_OBJECT (src, "thread ready");
+
+  g_mutex_lock (&src->session_mutex);
+  g_cond_signal (&src->session_cond);
+  g_mutex_unlock (&src->session_mutex);
+
+  return FALSE;
+}
+
+/* called with session_mutex taken */
 static gboolean
 gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
 {
-  GProxyResolver *proxy_resolver;
+  GstQuery *query;
+  gboolean can_share;
 
   if (src->session) {
     GST_DEBUG_OBJECT (src, "Session is already open");
@@ -937,158 +1123,148 @@ gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
     return FALSE;
   }
 
-  if (!src->session) {
-    GstQuery *query;
-    gboolean can_share = (src->timeout == DEFAULT_TIMEOUT)
-        && (src->cookies == NULL)
-        && (src->ssl_strict == DEFAULT_SSL_STRICT)
-        && (src->tls_interaction == NULL) && (src->proxy == NULL)
-        && (src->tls_database == DEFAULT_TLS_DATABASE);
-
-    if (gst_soup_loader_get_api_version () == 2)
-      can_share = can_share && (src->ssl_ca_file == DEFAULT_SSL_CA_FILE) &&
-          (src->ssl_use_system_ca_file == DEFAULT_SSL_USE_SYSTEM_CA_FILE);
-
-    query = gst_query_new_context (GST_SOUP_SESSION_CONTEXT);
-    if (gst_pad_peer_query (GST_BASE_SRC_PAD (src), query)) {
-      GstContext *context;
-
-      gst_query_parse_context (query, &context);
-      gst_element_set_context (GST_ELEMENT_CAST (src), context);
-    } else {
-      GstMessage *message;
+  can_share = (src->timeout == DEFAULT_TIMEOUT)
+      && (src->cookies == NULL)
+      && (src->ssl_strict == DEFAULT_SSL_STRICT)
+      && (src->tls_interaction == NULL) && (src->proxy == NULL)
+      && (src->tls_database == DEFAULT_TLS_DATABASE);
 
-      message =
-          gst_message_new_need_context (GST_OBJECT_CAST (src),
-          GST_SOUP_SESSION_CONTEXT);
-      gst_element_post_message (GST_ELEMENT_CAST (src), message);
-    }
-    gst_query_unref (query);
+  if (gst_soup_loader_get_api_version () == 2)
+    can_share = can_share && (src->ssl_ca_file == DEFAULT_SSL_CA_FILE) &&
+        (src->ssl_use_system_ca_file == DEFAULT_SSL_USE_SYSTEM_CA_FILE);
 
-    GST_OBJECT_LOCK (src);
-    if (src->external_session && (can_share || src->forced_external_session)) {
-      GST_DEBUG_OBJECT (src, "Using external session %p",
-          src->external_session);
-      src->session = g_object_ref (src->external_session);
-      src->session_is_shared = TRUE;
-    } else {
-      GST_DEBUG_OBJECT (src, "Creating session (can share %d)", can_share);
-
-      /* We explicitly set User-Agent to NULL here and overwrite it per message
-       * to be able to have the same session with different User-Agents per
-       * source */
-      src->session =
-          _soup_session_new_with_options ("user-agent", NULL,
-          "timeout", src->timeout, "tls-interaction", src->tls_interaction,
-          /* Unset the limit the number of maximum allowed connections */
-          "max-conns", can_share ? G_MAXINT : 10,
-          "max-conns-per-host", can_share ? G_MAXINT : 2, NULL);
-
-      if (gst_soup_loader_get_api_version () == 3) {
-        if (src->proxy != NULL) {
-          char *proxy_string = gst_soup_uri_to_string (src->proxy);
-          proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL);
-          g_free (proxy_string);
-          g_object_set (src->session, "proxy-resolver", proxy_resolver, NULL);
-          g_object_unref (proxy_resolver);
-        }
-      } else {
-        g_object_set (src->session, "ssl-strict", src->ssl_strict, NULL);
-        if (src->proxy != NULL) {
-          g_object_set (src->session, "proxy-uri", src->proxy->soup_uri, NULL);
-        }
-      }
+  query = gst_query_new_context (GST_SOUP_SESSION_CONTEXT);
+  if (gst_pad_peer_query (GST_BASE_SRC_PAD (src), query)) {
+    GstContext *context;
 
-      if (src->session) {
-        gst_soup_util_log_setup (src->session, src->log_level,
-            GST_ELEMENT (src));
-        if (gst_soup_loader_get_api_version () < 3) {
-          _soup_session_add_feature_by_type (src->session,
-              _soup_content_decoder_get_type ());
-        }
-        _soup_session_add_feature_by_type (src->session,
-            _soup_cookie_jar_get_type ());
-
-        if (can_share) {
-          GstContext *context;
-          GstMessage *message;
-          GstStructure *s;
-
-          GST_DEBUG_OBJECT (src, "Sharing session %p", src->session);
-          src->session_is_shared = TRUE;
-
-          context = gst_context_new (GST_SOUP_SESSION_CONTEXT, TRUE);
-          s = gst_context_writable_structure (context);
-          gst_structure_set (s, "session", _soup_session_get_type (),
-              src->session, "force", G_TYPE_BOOLEAN, FALSE, NULL);
-
-          gst_object_ref (src->session);
-          GST_OBJECT_UNLOCK (src);
-          gst_element_set_context (GST_ELEMENT_CAST (src), context);
-          message =
-              gst_message_new_have_context (GST_OBJECT_CAST (src), context);
-          gst_element_post_message (GST_ELEMENT_CAST (src), message);
-          GST_OBJECT_LOCK (src);
-          gst_object_unref (src->session);
-        } else {
-          src->session_is_shared = FALSE;
-        }
-      }
-    }
+    gst_query_parse_context (query, &context);
+    gst_element_set_context (GST_ELEMENT_CAST (src), context);
+  } else {
+    GstMessage *message;
 
-    if (!src->session) {
-      GST_ELEMENT_ERROR (src, LIBRARY, INIT,
-          (NULL), ("Failed to create session"));
-      GST_OBJECT_UNLOCK (src);
-      return FALSE;
-    }
+    message =
+        gst_message_new_need_context (GST_OBJECT_CAST (src),
+        GST_SOUP_SESSION_CONTEXT);
+    gst_element_post_message (GST_ELEMENT_CAST (src), message);
+  }
+  gst_query_unref (query);
 
+  GST_OBJECT_LOCK (src);
+
+  src->session_is_shared = can_share;
+
+  if (src->external_session && can_share) {
+    GST_DEBUG_OBJECT (src, "Using external session %p", src->external_session);
+    src->session = g_object_ref (src->external_session);
+    /* for soup2, connect another authenticate handler; see thread_func */
     if (gst_soup_loader_get_api_version () < 3) {
-      g_signal_connect (src->session, "authenticate",
+      g_signal_connect (src->session->session, "authenticate",
           G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src);
     }
+  } else {
+    GMainContext *ctx;
+    GSource *source;
 
-    if (!src->session_is_shared) {
-      if (src->tls_database)
-        g_object_set (src->session, "tls-database", src->tls_database, NULL);
-      else if (gst_soup_loader_get_api_version () == 2) {
-        if (src->ssl_ca_file)
-          g_object_set (src->session, "ssl-ca-file", src->ssl_ca_file, NULL);
-        else
-          g_object_set (src->session, "ssl-use-system-ca-file",
-              src->ssl_use_system_ca_file, NULL);
-      }
+    GST_DEBUG_OBJECT (src, "Creating session (can share %d)", can_share);
+
+    src->session =
+        GST_SOUP_SESSION (g_object_new (GST_TYPE_SOUP_SESSION, NULL));
+
+    ctx = g_main_context_new ();
+
+    src->session->loop = g_main_loop_new (ctx, FALSE);
+    /* now owned by the loop */
+    g_main_context_unref (ctx);
+
+    src->session->thread = g_thread_try_new ("souphttpsrc-thread",
+        thread_func, src, NULL);
+
+    if (!src->session->thread) {
+      goto err;
     }
-    GST_OBJECT_UNLOCK (src);
-  } else {
-    GST_DEBUG_OBJECT (src, "Re-using session");
+
+    source = g_idle_source_new ();
+    g_source_set_callback (source, _session_ready_cb, src, NULL);
+    g_source_attach (source, ctx);
+    g_source_unref (source);
+
+    GST_DEBUG_OBJECT (src, "Waiting for thread to start...");
+    while (!g_main_loop_is_running (src->session->loop))
+      g_cond_wait (&src->session_cond, &src->session_mutex);
+    GST_DEBUG_OBJECT (src, "Soup thread started");
   }
 
+  GST_OBJECT_UNLOCK (src);
+
   return TRUE;
+
+err:
+  g_clear_object (&src->session);
+  GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL), ("Failed to create session"));
+  GST_OBJECT_UNLOCK (src);
+
+  return FALSE;
+}
+
+static gboolean
+_session_close_cb (gpointer user_data)
+{
+  GstSoupHTTPSrc *src = user_data;
+
+  if (src->msg) {
+    gst_soup_session_cancel_message (src->session->session, src->msg,
+        src->cancellable);
+    g_clear_object (&src->msg);
+  }
+
+  if (src->session_is_shared)
+    _soup_session_abort (src->session->session);
+
+  /* there may be multiple of this callback attached to the session,
+   * each with different data pointer; disconnect the one we are closing
+   * the session for, leave the others alone
+   */
+  g_signal_handlers_disconnect_by_func (src->session->session,
+      G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src);
+
+  g_mutex_lock (&src->session_mutex);
+  g_clear_object (&src->session);
+  g_cond_signal (&src->session_cond);
+  g_mutex_unlock (&src->session_mutex);
+
+  return FALSE;
 }
 
 static void
 gst_soup_http_src_session_close (GstSoupHTTPSrc * src)
 {
+  GSource *source;
+  GstSoupSession *sess;
+
   GST_DEBUG_OBJECT (src, "Closing session");
 
-  g_mutex_lock (&src->mutex);
-  if (src->msg) {
-    gst_soup_session_cancel_message (src->session, src->msg, src->cancellable);
-    g_object_unref (src->msg);
-    src->msg = NULL;
+  if (!src->session) {
+    return;
   }
 
-  if (src->session) {
-    if (!src->session_is_shared)
-      _soup_session_abort (src->session);
-    g_signal_handlers_disconnect_by_func (src->session,
-        G_CALLBACK (gst_soup_http_src_authenticate_cb), src);
-    g_object_unref (src->session);
-    src->session = NULL;
-  }
+  /* ensure _session_close_cb does not deadlock us */
+  sess = g_object_ref (src->session);
+
+  source = g_idle_source_new ();
+
+  g_mutex_lock (&src->session_mutex);
+
+  g_source_set_callback (source, _session_close_cb, src, NULL);
+  g_source_attach (source, g_main_loop_get_context (src->session->loop));
+  g_source_unref (source);
+
+  while (src->session)
+    g_cond_wait (&src->session_cond, &src->session_mutex);
 
-  g_mutex_unlock (&src->mutex);
+  g_mutex_unlock (&src->session_mutex);
+
+  /* finally dispose of our reference from the gst thread */
+  g_object_unref (sess);
 }
 
 static void
@@ -1235,7 +1411,6 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
   }
 
   src->got_headers = TRUE;
-  g_cond_broadcast (&src->have_headers_cond);
 
   http_headers_event =
       gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, http_headers);
@@ -1610,51 +1785,98 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
   return TRUE;
 }
 
-/* Lock taken */
+struct GstSoupSendSrc
+{
+  GstSoupHTTPSrc *src;
+  GError *error;
+};
+
+static void
+_session_send_cb (GObject * source, GAsyncResult * res, gpointer user_data)
+{
+  struct GstSoupSendSrc *msrc = user_data;
+  GstSoupHTTPSrc *src = msrc->src;
+  GError *error = NULL;
+
+  g_mutex_lock (&src->session_mutex);
+
+  src->input_stream = _soup_session_send_finish (src->session->session,
+      res, &error);
+
+  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+    src->headers_ret = GST_FLOW_FLUSHING;
+  } else {
+    src->headers_ret = gst_soup_http_src_got_headers (src, src->msg);
+  }
+
+  if (!src->input_stream) {
+    GST_DEBUG_OBJECT (src, "Sending message failed: %s", error->message);
+    msrc->error = error;
+  }
+
+  g_cond_broadcast (&src->session_cond);
+  g_mutex_unlock (&src->session_mutex);
+}
+
+static gboolean
+_session_send_idle_cb (gpointer user_data)
+{
+  struct GstSoupSendSrc *msrc = user_data;
+  GstSoupHTTPSrc *src = msrc->src;
+
+  _soup_session_send_async (src->session->session, src->msg, src->cancellable,
+      _session_send_cb, msrc);
+
+  return FALSE;
+}
+
+/* called with session lock taken */
 static GstFlowReturn
 gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
 {
   GstFlowReturn ret;
-  GError *error = NULL;
+  GSource *source;
+  struct GstSoupSendSrc msrc;
 
   g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
   g_assert (src->input_stream == NULL);
 
-  src->input_stream =
-      _soup_session_send (src->session, src->msg, src->cancellable, &error);
+  msrc.src = src;
+  msrc.error = NULL;
 
-  if (error)
-    GST_DEBUG_OBJECT (src, "Sending message failed: %s", error->message);
+  source = g_idle_source_new ();
 
-  if (g_cancellable_is_cancelled (src->cancellable)) {
-    ret = GST_FLOW_FLUSHING;
-    goto done;
-  }
+  src->headers_ret = GST_FLOW_OK;
+
+  g_source_set_callback (source, _session_send_idle_cb, &msrc, NULL);
+  g_source_attach (source, g_main_loop_get_context (src->session->loop));
+  g_source_unref (source);
+
+  while (!src->input_stream && !msrc.error)
+    g_cond_wait (&src->session_cond, &src->session_mutex);
+
+  ret = src->headers_ret;
 
-  ret = gst_soup_http_src_got_headers (src, src->msg);
   if (ret != GST_FLOW_OK) {
     goto done;
   }
 
   if (!src->input_stream) {
-    GST_DEBUG_OBJECT (src, "Didn't get an input stream: %s", error->message);
+    GST_DEBUG_OBJECT (src, "Didn't get an input stream: %s",
+        msrc.error->message);
     ret = GST_FLOW_ERROR;
     goto done;
   }
 
-  if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (src->msg))) {
-    GST_DEBUG_OBJECT (src, "Successfully got a reply");
-  } else {
-    /* FIXME - be more helpful to people debugging */
-    ret = GST_FLOW_ERROR;
-  }
+  /* if an input stream exists, it was always successful */
+  GST_DEBUG_OBJECT (src, "Successfully got a reply");
 
 done:
-  if (error)
-    g_error_free (error);
+  g_clear_error (&msrc.error);
   return ret;
 }
 
+/* called with session lock taken */
 static GstFlowReturn
 gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method)
 {
@@ -1791,13 +2013,49 @@ gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
   }
 }
 
+struct GstSoupReadResult
+{
+  GstSoupHTTPSrc *src;
+  GError *error;
+  void *buffer;
+  gsize bufsize;
+  gssize nbytes;
+};
+
+static void
+_session_read_cb (GObject * source, GAsyncResult * ret, gpointer user_data)
+{
+  struct GstSoupReadResult *res = user_data;
+
+  g_mutex_lock (&res->src->session_mutex);
+
+  res->nbytes = g_input_stream_read_finish (G_INPUT_STREAM (source),
+      ret, &res->error);
+
+  g_cond_signal (&res->src->session_cond);
+  g_mutex_unlock (&res->src->session_mutex);
+}
+
+static gboolean
+_session_read_idle_cb (gpointer user_data)
+{
+  struct GstSoupReadResult *res = user_data;
+
+  g_input_stream_read_async (res->src->input_stream, res->buffer,
+      res->bufsize, G_PRIORITY_DEFAULT, res->src->cancellable,
+      _session_read_cb, res);
+
+  return FALSE;
+}
+
 static GstFlowReturn
 gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
 {
-  gssize read_bytes;
+  struct GstSoupReadResult res;
   GstMapInfo mapinfo;
   GstBaseSrc *bsrc;
   GstFlowReturn ret;
+  GSource *source;
 
   bsrc = GST_BASE_SRC_CAST (src);
 
@@ -1812,31 +2070,54 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
     return GST_FLOW_ERROR;
   }
 
-  read_bytes =
-      g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
-      src->cancellable, NULL);
+  res.src = src;
+  res.buffer = mapinfo.data;
+  res.bufsize = mapinfo.size;
+  res.error = NULL;
+  res.nbytes = -1;
+
+  source = g_idle_source_new ();
+
+  g_mutex_lock (&src->session_mutex);
+
+  g_source_set_callback (source, _session_read_idle_cb, &res, NULL);
+  /* invoke on libsoup thread */
+  g_source_attach (source, g_main_loop_get_context (src->session->loop));
+  g_source_unref (source);
+
+  /* wait for it */
+  while (!res.error && res.nbytes < 0)
+    g_cond_wait (&src->session_cond, &src->session_mutex);
+  g_mutex_unlock (&src->session_mutex);
+
   GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
-      read_bytes);
+      res.nbytes);
 
-  g_mutex_lock (&src->mutex);
-  if (g_cancellable_is_cancelled (src->cancellable)) {
+  if (res.error) {
+    /* retry by default */
+    GstFlowReturn ret = GST_FLOW_CUSTOM_ERROR;
+    if (g_error_matches (res.error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+      ret = GST_FLOW_FLUSHING;
+    } else {
+      GST_ERROR_OBJECT (src, "Got error from libsoup: %s", res.error->message);
+    }
+    g_error_free (res.error);
     gst_buffer_unmap (*outbuf, &mapinfo);
     gst_buffer_unref (*outbuf);
-    g_mutex_unlock (&src->mutex);
-    return GST_FLOW_FLUSHING;
+    return ret;
   }
 
   gst_buffer_unmap (*outbuf, &mapinfo);
-  if (read_bytes > 0) {
-    gst_buffer_set_size (*outbuf, read_bytes);
+  if (res.nbytes > 0) {
+    gst_buffer_set_size (*outbuf, res.nbytes);
     GST_BUFFER_OFFSET (*outbuf) = bsrc->segment.position;
     ret = GST_FLOW_OK;
-    gst_soup_http_src_update_position (src, read_bytes);
+    gst_soup_http_src_update_position (src, res.nbytes);
 
     /* Got some data, reset retry counter */
     src->retry_count = 0;
 
-    gst_soup_http_src_check_update_blocksize (src, read_bytes);
+    gst_soup_http_src_check_update_blocksize (src, res.nbytes);
 
     src->last_socket_read_time = g_get_monotonic_time () * GST_USECOND;
 
@@ -1845,39 +2126,92 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
      * otherwise we would have to cancel the message and close the connection
      */
     if (bsrc->segment.stop != -1
-        && bsrc->segment.position + read_bytes >= bsrc->segment.stop) {
+        && bsrc->segment.position + res.nbytes >= bsrc->segment.stop) {
+      SoupMessage *msg = src->msg;
       guint8 tmp[128];
 
-      g_object_unref (src->msg);
+      res.buffer = tmp;
+      res.bufsize = sizeof (tmp);
+      res.nbytes = -1;
+
       src->msg = NULL;
       src->have_body = TRUE;
 
+      g_mutex_lock (&src->session_mutex);
+
+      source = g_idle_source_new ();
+
+      g_source_set_callback (source, _session_read_idle_cb, &res, NULL);
       /* This should return immediately as we're at the end of the range */
-      read_bytes =
-          g_input_stream_read (src->input_stream, tmp, sizeof (tmp),
-          src->cancellable, NULL);
-      if (read_bytes > 0)
+      g_source_attach (source, g_main_loop_get_context (src->session->loop));
+      g_source_unref (source);
+
+      while (!res.error && res.nbytes < 0)
+        g_cond_wait (&src->session_cond, &src->session_mutex);
+      g_mutex_unlock (&src->session_mutex);
+
+      g_clear_error (&res.error);
+      g_object_unref (msg);
+
+      if (res.nbytes > 0)
         GST_ERROR_OBJECT (src,
-            "Read %" G_GSIZE_FORMAT " bytes after end of range", read_bytes);
+            "Read %" G_GSIZE_FORMAT " bytes after end of range", res.nbytes);
     }
   } else {
     gst_buffer_unref (*outbuf);
-    if (read_bytes < 0 ||
-        (src->have_size && src->read_position < src->content_size)) {
+    if (src->have_size && src->read_position < src->content_size) {
       /* Maybe the server disconnected, retry */
       ret = GST_FLOW_CUSTOM_ERROR;
     } else {
-      g_object_unref (src->msg);
+      g_clear_object (&src->msg);
       src->msg = NULL;
       ret = GST_FLOW_EOS;
       src->have_body = TRUE;
     }
   }
-  g_mutex_unlock (&src->mutex);
+
+  g_clear_error (&res.error);
 
   return ret;
 }
 
+static gboolean
+_session_stream_clear_cb (gpointer user_data)
+{
+  GstSoupHTTPSrc *src = user_data;
+
+  g_mutex_lock (&src->session_mutex);
+
+  g_clear_object (&src->input_stream);
+
+  g_cond_signal (&src->session_cond);
+  g_mutex_unlock (&src->session_mutex);
+
+  return FALSE;
+}
+
+static void
+gst_soup_http_src_stream_clear (GstSoupHTTPSrc * src)
+{
+  GSource *source;
+
+  if (!src->input_stream)
+    return;
+
+  g_mutex_lock (&src->session_mutex);
+
+  source = g_idle_source_new ();
+
+  g_source_set_callback (source, _session_stream_clear_cb, src, NULL);
+  g_source_attach (source, g_main_loop_get_context (src->session->loop));
+  g_source_unref (source);
+
+  while (src->input_stream)
+    g_cond_wait (&src->session_cond, &src->session_mutex);
+
+  g_mutex_unlock (&src->session_mutex);
+}
+
 static GstFlowReturn
 gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
@@ -1888,33 +2222,28 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   src = GST_SOUP_HTTP_SRC (psrc);
 
 retry:
-  g_mutex_lock (&src->mutex);
 
   /* Check for pending position change */
-  if (src->request_position != src->read_position) {
-    if (src->input_stream) {
-      g_input_stream_close (src->input_stream, src->cancellable, NULL);
-      g_object_unref (src->input_stream);
-      src->input_stream = NULL;
-    }
+  if (src->request_position != src->read_position && src->input_stream) {
+    gst_soup_http_src_stream_clear (src);
   }
 
   if (g_cancellable_is_cancelled (src->cancellable)) {
     ret = GST_FLOW_FLUSHING;
-    g_mutex_unlock (&src->mutex);
     goto done;
   }
 
   /* If we have no open connection to the server, start one */
   if (!src->input_stream) {
     *outbuf = NULL;
+    g_mutex_lock (&src->session_mutex);
     ret =
         gst_soup_http_src_do_request (src,
         src->method ? src->method : SOUP_METHOD_GET);
     http_headers_event = src->http_headers_event;
     src->http_headers_event = NULL;
+    g_mutex_unlock (&src->session_mutex);
   }
-  g_mutex_unlock (&src->mutex);
 
   if (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_ERROR) {
     if (http_headers_event) {
@@ -1932,12 +2261,9 @@ done:
     if (http_headers_event)
       gst_event_unref (http_headers_event);
 
-    g_mutex_lock (&src->mutex);
     if (src->input_stream) {
-      g_object_unref (src->input_stream);
-      src->input_stream = NULL;
+      gst_soup_http_src_stream_clear (src);
     }
-    g_mutex_unlock (&src->mutex);
     if (ret == GST_FLOW_CUSTOM_ERROR) {
       ret = GST_FLOW_OK;
       goto retry;
@@ -1945,9 +2271,7 @@ done:
   }
 
   if (ret == GST_FLOW_FLUSHING) {
-    g_mutex_lock (&src->mutex);
     src->retry_count = 0;
-    g_mutex_unlock (&src->mutex);
   }
 
   return ret;
@@ -1957,10 +2281,14 @@ static gboolean
 gst_soup_http_src_start (GstBaseSrc * bsrc)
 {
   GstSoupHTTPSrc *src = GST_SOUP_HTTP_SRC (bsrc);
+  gboolean ret;
 
   GST_DEBUG_OBJECT (src, "start(\"%s\")", src->location);
 
-  return gst_soup_http_src_session_open (src);
+  g_mutex_lock (&src->session_mutex);
+  ret = gst_soup_http_src_session_open (src);
+  g_mutex_unlock (&src->session_mutex);
+  return ret;
 }
 
 static gboolean
@@ -1970,8 +2298,11 @@ gst_soup_http_src_stop (GstBaseSrc * bsrc)
 
   src = GST_SOUP_HTTP_SRC (bsrc);
   GST_DEBUG_OBJECT (src, "stop()");
+
+  gst_soup_http_src_stream_clear (src);
+
   if (src->keep_alive && !src->msg && !src->session_is_shared)
-    gst_soup_http_src_cancel_message (src);
+    g_cancellable_cancel (src->cancellable);
   else
     gst_soup_http_src_session_close (src);
 
@@ -2010,17 +2341,13 @@ gst_soup_http_src_set_context (GstElement * element, GstContext * context)
     const GstStructure *s = gst_context_get_structure (context);
 
     GST_OBJECT_LOCK (src);
-    if (src->external_session)
-      g_object_unref (src->external_session);
-    src->external_session = NULL;
-    gst_structure_get (s, "session", _soup_session_get_type (),
+
+    g_clear_object (&src->external_session);
+    gst_structure_get (s, "session", GST_TYPE_SOUP_SESSION,
         &src->external_session, NULL);
-    src->forced_external_session = FALSE;
-    gst_structure_get (s, "force", G_TYPE_BOOLEAN,
-        &src->forced_external_session, NULL);
 
-    GST_DEBUG_OBJECT (src, "Setting external session %p (force: %d)",
-        src->external_session, src->forced_external_session);
+    GST_DEBUG_OBJECT (src, "Setting external session %p",
+        src->external_session);
     GST_OBJECT_UNLOCK (src);
   }
 
@@ -2036,7 +2363,7 @@ gst_soup_http_src_unlock (GstBaseSrc * bsrc)
   src = GST_SOUP_HTTP_SRC (bsrc);
   GST_DEBUG_OBJECT (src, "unlock()");
 
-  gst_soup_http_src_cancel_message (src);
+  g_cancellable_cancel (src->cancellable);
   return TRUE;
 }
 
@@ -2080,19 +2407,20 @@ gst_soup_http_src_check_seekable (GstSoupHTTPSrc * src)
    * loops.
    */
   if (!src->got_headers && GST_STATE (src) >= GST_STATE_PAUSED) {
-    g_mutex_lock (&src->mutex);
+    g_mutex_lock (&src->session_mutex);
     while (!src->got_headers && !g_cancellable_is_cancelled (src->cancellable)
         && ret == GST_FLOW_OK) {
       if ((src->msg && _soup_message_get_method (src->msg) != SOUP_METHOD_HEAD)) {
         /* wait for the current request to finish */
-        g_cond_wait (&src->have_headers_cond, &src->mutex);
+        g_cond_wait (&src->session_cond, &src->session_mutex);
+        ret = src->headers_ret;
       } else {
         if (gst_soup_http_src_session_open (src)) {
           ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD);
         }
       }
     }
-    g_mutex_unlock (&src->mutex);
+    g_mutex_unlock (&src->session_mutex);
   }
 }
 
index a7184fee62e19ffbede3ad215ec44072a63ba252..89705943bf17417c48c9948249f957e011339c41 100644 (file)
@@ -45,6 +45,9 @@ typedef enum {
   GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED,
 } GstSoupHTTPSrcSessionIOStatus;
 
+/* opaque from here, implementation detail */
+typedef struct _GstSoupSession GstSoupSession;
+
 struct _GstSoupHTTPSrc {
   GstPushSrc element;
 
@@ -59,15 +62,15 @@ struct _GstSoupHTTPSrc {
   gchar *proxy_id;             /* Authentication user id for proxy URI. */
   gchar *proxy_pw;             /* Authentication user password for proxy URI. */
   gchar **cookies;             /* HTTP request cookies. */
-  SoupSession *session;        /* Async context. */
+  GstSoupSession *session;     /* Libsoup session wrapper. */
   gboolean session_is_shared;
-  SoupSession *external_session; /* Shared via GstContext */
-  gboolean forced_external_session; /* If session was explicitly set from application */
+  GstSoupSession *external_session; /* Shared via GstContext */
   SoupMessage *msg;            /* Request message. */
   gint retry_count;            /* Number of retries since we received data */
   gint max_retries;            /* Maximum number of retries */
   gchar *method;               /* HTTP method */
 
+  GstFlowReturn headers_ret;
   gboolean got_headers;        /* Already received headers from the server */
   gboolean have_size;          /* Received and parsed Content-Length
                                   header. */
@@ -111,8 +114,12 @@ struct _GstSoupHTTPSrc {
 
   guint timeout;
 
-  GMutex mutex;
-  GCond have_headers_cond;
+  /* This mutex-cond pair is used to talk to the soup session thread; it is
+   * per src to allow concurrent access to shared sessions (if it was inside
+   * the shared session structure, it would be effectively global)
+   */
+  GMutex session_mutex;
+  GCond session_cond;
 
   GstEvent *http_headers_event;
 
index 119f90706a8b51615a695ba57f4fb66f20adf04f..2c525a6e72346497ef555c5560fba682a5bed29e 100644 (file)
@@ -111,6 +111,10 @@ typedef struct _GstSoupVTable
   void (*_soup_auth_authenticate) (SoupAuth * auth, const char *username,
     const char *password);
   const char *(*_soup_message_get_method_3) (SoupMessage * msg);
+  GInputStream *(*_soup_session_send_async_2) (SoupSession * session, SoupMessage * msg,
+    GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data);
+  GInputStream *(*_soup_session_send_async_3) (SoupSession * session, SoupMessage * msg,
+    int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data);
   GInputStream *(*_soup_session_send_finish) (SoupSession * session,
     GAsyncResult * result, GError ** error);
   GInputStream *(*_soup_session_send) (SoupSession * session, SoupMessage * msg,
@@ -220,6 +224,7 @@ gst_soup_load_library (void)
         LOAD_VERSIONED_SYMBOL (2, soup_uri_to_string);
         LOAD_VERSIONED_SYMBOL (2, soup_message_get_uri);
         LOAD_VERSIONED_SYMBOL (2, soup_session_cancel_message);
+        LOAD_VERSIONED_SYMBOL (2, soup_session_send_async);
       } else {
         vtable->lib_version = 3;
         LOAD_VERSIONED_SYMBOL (3, soup_logger_new);
@@ -232,6 +237,7 @@ gst_soup_load_library (void)
         LOAD_VERSIONED_SYMBOL (3, soup_message_get_method);
         LOAD_VERSIONED_SYMBOL (3, soup_message_get_reason_phrase);
         LOAD_VERSIONED_SYMBOL (3, soup_message_get_status);
+        LOAD_VERSIONED_SYMBOL (3, soup_session_send_async);
       }
 
       LOAD_SYMBOL (soup_auth_authenticate);
@@ -795,6 +801,32 @@ _soup_message_get_method (SoupMessage * msg)
 #endif
 }
 
+void
+_soup_session_send_async (SoupSession * session, SoupMessage * msg,
+    GCancellable * cancellable, GAsyncReadyCallback callback,
+    gpointer user_data)
+{
+#ifdef STATIC_SOUP
+#if STATIC_SOUP == 2
+  return soup_session_send_async (session, msg, cancellable,
+      callback, user_data);
+#else
+  return soup_session_send_async (session, msg, G_PRIORITY_DEFAULT,
+      cancellable, callback, user_data);
+#endif
+#else
+  if (gst_soup_vtable.lib_version == 3) {
+    g_assert (gst_soup_vtable._soup_session_send_async_3 != NULL);
+    gst_soup_vtable._soup_session_send_async_3 (session, msg,
+        G_PRIORITY_DEFAULT, cancellable, callback, user_data);
+  } else {
+    g_assert (gst_soup_vtable._soup_session_send_async_2 != NULL);
+    gst_soup_vtable._soup_session_send_async_2 (session, msg,
+        cancellable, callback, user_data);
+  }
+#endif
+}
+
 GInputStream *
 _soup_session_send_finish (SoupSession * session,
     GAsyncResult * result, GError ** error)
index 3025862193aae6f76503051d762e7d8874fe92c1..ea5e16e2527b92a81e18b99c9edf510819773ebe 100644 (file)
@@ -99,6 +99,12 @@ void _soup_auth_authenticate (SoupAuth *auth, const char *username,
 
 const char *_soup_message_get_method (SoupMessage *msg);
 
+void _soup_session_send_async (SoupSession *session,
+                               SoupMessage *msg,
+                               GCancellable *cancellable,
+                               GAsyncReadyCallback callback,
+                               gpointer user_data);
+
 GInputStream *_soup_session_send_finish (SoupSession *session,
                                          GAsyncResult *result, GError **error);