#include <gst/tag/tag.h>
+/* this is a simple wrapper class around SoupSession; it exists in order to
+ * have a refcountable owner for the actual SoupSession + the thread it runs
+ * in and its main loop (we cannot inverse the ownership hierarchy, because
+ * the thread + loop are actually longer lived than the session)
+ *
+ * it is entirely private to this implementation
+ */
+
+#define GST_TYPE_SOUP_SESSION (gst_soup_session_get_type())
+#define GST_SOUP_SESSION(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SOUP_SESSION, GstSoupSession))
+
+GType gst_soup_session_get_type (void);
+
+typedef struct _GstSoupSessionClass GstSoupSessionClass;
+
+struct _GstSoupSession
+{
+ GObject parent_instance;
+
+ SoupSession *session;
+ GThread *thread;
+ GMainLoop *loop;
+};
+
+struct _GstSoupSessionClass
+{
+ GObjectClass parent_class;
+};
+
+G_DEFINE_TYPE (GstSoupSession, gst_soup_session, G_TYPE_OBJECT);
+
+static void
+gst_soup_session_init (GstSoupSession * sess)
+{
+}
+
+static gboolean
+_soup_session_finalize_cb (gpointer user_data)
+{
+ GstSoupSession *sess = user_data;
+
+ g_clear_object (&sess->session);
+ g_main_loop_quit (sess->loop);
+
+ return FALSE;
+}
+
+static void
+gst_soup_session_finalize (GObject * obj)
+{
+ GstSoupSession *sess = GST_SOUP_SESSION (obj);
+ GSource *src;
+
+ /* handle disposing of failure cases */
+ if (!sess->loop)
+ return;
+
+ src = g_idle_source_new ();
+
+ g_source_set_callback (src, _soup_session_finalize_cb, sess, NULL);
+ g_source_attach (src, g_main_loop_get_context (sess->loop));
+ g_source_unref (src);
+
+ /* finish off thread and the loop; ensure it's not from the thread */
+ g_assert (!g_main_context_is_owner (g_main_loop_get_context (sess->loop)));
+ g_thread_join (sess->thread);
+ g_main_loop_unref (sess->loop);
+}
+
+static void
+gst_soup_session_class_init (GstSoupSessionClass * klass)
+{
+ GObjectClass *gclass = G_OBJECT_CLASS (klass);
+
+ gclass->finalize = gst_soup_session_finalize;
+}
+
GST_DEBUG_CATEGORY_STATIC (souphttpsrc_debug);
#define GST_CAT_DEFAULT souphttpsrc_debug
static char *gst_soup_http_src_unicodify (const char *str);
static gboolean gst_soup_http_src_build_message (GstSoupHTTPSrc * src,
const gchar * method);
-static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src);
static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src,
guint64 offset, guint64 stop_offset);
static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src);
src->retry_count = 0;
src->have_size = FALSE;
src->got_headers = FALSE;
+ src->headers_ret = GST_FLOW_OK;
src->seekable = FALSE;
src->read_position = 0;
src->request_position = 0;
src->last_socket_read_time = 0;
g_cancellable_reset (src->cancellable);
- g_mutex_lock (&src->mutex);
- if (src->input_stream) {
- g_object_unref (src->input_stream);
- src->input_stream = NULL;
- }
- g_mutex_unlock (&src->mutex);
gst_caps_replace (&src->src_caps, NULL);
g_free (src->iradio_name);
{
const gchar *proxy;
- g_mutex_init (&src->mutex);
- g_cond_init (&src->have_headers_cond);
+ g_mutex_init (&src->session_mutex);
+ g_cond_init (&src->session_cond);
src->cancellable = g_cancellable_new ();
src->location = NULL;
src->redirection_uri = NULL;
src->iradio_mode = DEFAULT_IRADIO_MODE;
src->session = NULL;
src->external_session = NULL;
- src->forced_external_session = FALSE;
src->msg = NULL;
src->timeout = DEFAULT_TIMEOUT;
src->log_level = DEFAULT_SOUP_LOG_LEVEL;
gst_soup_http_src_session_close (src);
- if (src->external_session) {
- g_object_unref (src->external_session);
- src->external_session = NULL;
- }
+ g_clear_object (&src->external_session);
G_OBJECT_CLASS (parent_class)->dispose (gobject);
}
GST_DEBUG_OBJECT (src, "finalize");
- g_mutex_clear (&src->mutex);
- g_cond_clear (&src->have_headers_cond);
+ g_mutex_clear (&src->session_mutex);
+ g_cond_clear (&src->session_cond);
g_object_unref (src->cancellable);
g_free (src->location);
g_free (src->redirection_uri);
return gst_tag_freeform_string_to_utf8 (str, -1, env_vars);
}
-static void
-gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
-{
- g_cancellable_cancel (src->cancellable);
- g_cond_signal (&src->have_headers_cond);
-}
-
static gboolean
gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src, guint64 offset,
guint64 stop_offset)
return gst_structure_foreach (src->extra_headers, _append_extra_headers, src);
}
+static gpointer
+thread_func (gpointer user_data)
+{
+ GstSoupHTTPSrc *src = user_data;
+ GMainContext *ctx;
+
+ GST_DEBUG_OBJECT (src, "thread start");
+
+ ctx = g_main_loop_get_context (src->session->loop);
+
+ g_main_context_push_thread_default (ctx);
+
+ /* We explicitly set User-Agent to NULL here and overwrite it per message
+ * to be able to have the same session with different User-Agents per
+ * source */
+ src->session->session =
+ _soup_session_new_with_options ("user-agent", NULL,
+ "timeout", src->timeout, "tls-interaction", src->tls_interaction,
+ /* Unset the limit the number of maximum allowed connections */
+ "max-conns", src->session_is_shared ? G_MAXINT : 10,
+ "max-conns-per-host", src->session_is_shared ? G_MAXINT : 2, NULL);
+
+ if (!src->session->session) {
+ return NULL;
+ }
+
+ if (gst_soup_loader_get_api_version () == 3) {
+ if (src->proxy != NULL) {
+ GProxyResolver *proxy_resolver;
+ char *proxy_string = gst_soup_uri_to_string (src->proxy);
+ proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL);
+ g_free (proxy_string);
+ g_object_set (src->session->session, "proxy-resolver", proxy_resolver,
+ NULL);
+ g_object_unref (proxy_resolver);
+ }
+ } else {
+ g_object_set (src->session->session, "ssl-strict", src->ssl_strict, NULL);
+ if (src->proxy != NULL) {
+ g_object_set (src->session->session, "proxy-uri", src->proxy->soup_uri,
+ NULL);
+ }
+ }
+
+ gst_soup_util_log_setup (src->session->session, src->log_level,
+ GST_ELEMENT (src));
+ if (gst_soup_loader_get_api_version () < 3) {
+ _soup_session_add_feature_by_type (src->session->session,
+ _soup_content_decoder_get_type ());
+ }
+ _soup_session_add_feature_by_type (src->session->session,
+ _soup_cookie_jar_get_type ());
+
+ if (src->session_is_shared) {
+ GstContext *context;
+ GstMessage *message;
+ GstStructure *s;
+
+ GST_DEBUG_OBJECT (src, "Sharing session %p", src->session->session);
+
+ context = gst_context_new (GST_SOUP_SESSION_CONTEXT, TRUE);
+ s = gst_context_writable_structure (context);
+ gst_structure_set (s, "session", GST_TYPE_SOUP_SESSION, src->session, NULL);
+
+ /* during this time the src is locked by the parent thread,
+ * which is waiting, so this is safe to do
+ */
+ GST_OBJECT_UNLOCK (src);
+ gst_element_set_context (GST_ELEMENT_CAST (src), context);
+ message = gst_message_new_have_context (GST_OBJECT_CAST (src), context);
+ gst_element_post_message (GST_ELEMENT_CAST (src), message);
+ GST_OBJECT_LOCK (src);
+ } else {
+ src->session_is_shared = FALSE;
+ }
+
+ /* soup2: connect the authenticate handler for the src that spawned the
+ * session (i.e. the first owner); other users of this session will connect
+ * their own after fetching the external session; the callback will handle
+ * this correctly (it checks if the message belongs to the current src
+ * and exits early if it does not)
+ */
+ if (gst_soup_loader_get_api_version () < 3) {
+ g_signal_connect (src->session->session, "authenticate",
+ G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src);
+ }
+
+ if (!src->session_is_shared) {
+ if (src->tls_database)
+ g_object_set (src->session->session, "tls-database", src->tls_database,
+ NULL);
+ else if (gst_soup_loader_get_api_version () == 2) {
+ if (src->ssl_ca_file)
+ g_object_set (src->session->session, "ssl-ca-file", src->ssl_ca_file,
+ NULL);
+ else
+ g_object_set (src->session->session, "ssl-use-system-ca-file",
+ src->ssl_use_system_ca_file, NULL);
+ }
+ }
+
+ g_main_loop_run (src->session->loop);
+
+ g_main_context_pop_thread_default (ctx);
+
+ GST_DEBUG_OBJECT (src, "thread stop");
+
+ return NULL;
+}
+
+static gboolean
+_session_ready_cb (gpointer user_data)
+{
+ GstSoupHTTPSrc *src = user_data;
+
+ GST_DEBUG_OBJECT (src, "thread ready");
+
+ g_mutex_lock (&src->session_mutex);
+ g_cond_signal (&src->session_cond);
+ g_mutex_unlock (&src->session_mutex);
+
+ return FALSE;
+}
+
+/* called with session_mutex taken */
static gboolean
gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
{
- GProxyResolver *proxy_resolver;
+ GstQuery *query;
+ gboolean can_share;
if (src->session) {
GST_DEBUG_OBJECT (src, "Session is already open");
return FALSE;
}
- if (!src->session) {
- GstQuery *query;
- gboolean can_share = (src->timeout == DEFAULT_TIMEOUT)
- && (src->cookies == NULL)
- && (src->ssl_strict == DEFAULT_SSL_STRICT)
- && (src->tls_interaction == NULL) && (src->proxy == NULL)
- && (src->tls_database == DEFAULT_TLS_DATABASE);
-
- if (gst_soup_loader_get_api_version () == 2)
- can_share = can_share && (src->ssl_ca_file == DEFAULT_SSL_CA_FILE) &&
- (src->ssl_use_system_ca_file == DEFAULT_SSL_USE_SYSTEM_CA_FILE);
-
- query = gst_query_new_context (GST_SOUP_SESSION_CONTEXT);
- if (gst_pad_peer_query (GST_BASE_SRC_PAD (src), query)) {
- GstContext *context;
-
- gst_query_parse_context (query, &context);
- gst_element_set_context (GST_ELEMENT_CAST (src), context);
- } else {
- GstMessage *message;
+ can_share = (src->timeout == DEFAULT_TIMEOUT)
+ && (src->cookies == NULL)
+ && (src->ssl_strict == DEFAULT_SSL_STRICT)
+ && (src->tls_interaction == NULL) && (src->proxy == NULL)
+ && (src->tls_database == DEFAULT_TLS_DATABASE);
- message =
- gst_message_new_need_context (GST_OBJECT_CAST (src),
- GST_SOUP_SESSION_CONTEXT);
- gst_element_post_message (GST_ELEMENT_CAST (src), message);
- }
- gst_query_unref (query);
+ if (gst_soup_loader_get_api_version () == 2)
+ can_share = can_share && (src->ssl_ca_file == DEFAULT_SSL_CA_FILE) &&
+ (src->ssl_use_system_ca_file == DEFAULT_SSL_USE_SYSTEM_CA_FILE);
- GST_OBJECT_LOCK (src);
- if (src->external_session && (can_share || src->forced_external_session)) {
- GST_DEBUG_OBJECT (src, "Using external session %p",
- src->external_session);
- src->session = g_object_ref (src->external_session);
- src->session_is_shared = TRUE;
- } else {
- GST_DEBUG_OBJECT (src, "Creating session (can share %d)", can_share);
-
- /* We explicitly set User-Agent to NULL here and overwrite it per message
- * to be able to have the same session with different User-Agents per
- * source */
- src->session =
- _soup_session_new_with_options ("user-agent", NULL,
- "timeout", src->timeout, "tls-interaction", src->tls_interaction,
- /* Unset the limit the number of maximum allowed connections */
- "max-conns", can_share ? G_MAXINT : 10,
- "max-conns-per-host", can_share ? G_MAXINT : 2, NULL);
-
- if (gst_soup_loader_get_api_version () == 3) {
- if (src->proxy != NULL) {
- char *proxy_string = gst_soup_uri_to_string (src->proxy);
- proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL);
- g_free (proxy_string);
- g_object_set (src->session, "proxy-resolver", proxy_resolver, NULL);
- g_object_unref (proxy_resolver);
- }
- } else {
- g_object_set (src->session, "ssl-strict", src->ssl_strict, NULL);
- if (src->proxy != NULL) {
- g_object_set (src->session, "proxy-uri", src->proxy->soup_uri, NULL);
- }
- }
+ query = gst_query_new_context (GST_SOUP_SESSION_CONTEXT);
+ if (gst_pad_peer_query (GST_BASE_SRC_PAD (src), query)) {
+ GstContext *context;
- if (src->session) {
- gst_soup_util_log_setup (src->session, src->log_level,
- GST_ELEMENT (src));
- if (gst_soup_loader_get_api_version () < 3) {
- _soup_session_add_feature_by_type (src->session,
- _soup_content_decoder_get_type ());
- }
- _soup_session_add_feature_by_type (src->session,
- _soup_cookie_jar_get_type ());
-
- if (can_share) {
- GstContext *context;
- GstMessage *message;
- GstStructure *s;
-
- GST_DEBUG_OBJECT (src, "Sharing session %p", src->session);
- src->session_is_shared = TRUE;
-
- context = gst_context_new (GST_SOUP_SESSION_CONTEXT, TRUE);
- s = gst_context_writable_structure (context);
- gst_structure_set (s, "session", _soup_session_get_type (),
- src->session, "force", G_TYPE_BOOLEAN, FALSE, NULL);
-
- gst_object_ref (src->session);
- GST_OBJECT_UNLOCK (src);
- gst_element_set_context (GST_ELEMENT_CAST (src), context);
- message =
- gst_message_new_have_context (GST_OBJECT_CAST (src), context);
- gst_element_post_message (GST_ELEMENT_CAST (src), message);
- GST_OBJECT_LOCK (src);
- gst_object_unref (src->session);
- } else {
- src->session_is_shared = FALSE;
- }
- }
- }
+ gst_query_parse_context (query, &context);
+ gst_element_set_context (GST_ELEMENT_CAST (src), context);
+ } else {
+ GstMessage *message;
- if (!src->session) {
- GST_ELEMENT_ERROR (src, LIBRARY, INIT,
- (NULL), ("Failed to create session"));
- GST_OBJECT_UNLOCK (src);
- return FALSE;
- }
+ message =
+ gst_message_new_need_context (GST_OBJECT_CAST (src),
+ GST_SOUP_SESSION_CONTEXT);
+ gst_element_post_message (GST_ELEMENT_CAST (src), message);
+ }
+ gst_query_unref (query);
+ GST_OBJECT_LOCK (src);
+
+ src->session_is_shared = can_share;
+
+ if (src->external_session && can_share) {
+ GST_DEBUG_OBJECT (src, "Using external session %p", src->external_session);
+ src->session = g_object_ref (src->external_session);
+ /* for soup2, connect another authenticate handler; see thread_func */
if (gst_soup_loader_get_api_version () < 3) {
- g_signal_connect (src->session, "authenticate",
+ g_signal_connect (src->session->session, "authenticate",
G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src);
}
+ } else {
+ GMainContext *ctx;
+ GSource *source;
- if (!src->session_is_shared) {
- if (src->tls_database)
- g_object_set (src->session, "tls-database", src->tls_database, NULL);
- else if (gst_soup_loader_get_api_version () == 2) {
- if (src->ssl_ca_file)
- g_object_set (src->session, "ssl-ca-file", src->ssl_ca_file, NULL);
- else
- g_object_set (src->session, "ssl-use-system-ca-file",
- src->ssl_use_system_ca_file, NULL);
- }
+ GST_DEBUG_OBJECT (src, "Creating session (can share %d)", can_share);
+
+ src->session =
+ GST_SOUP_SESSION (g_object_new (GST_TYPE_SOUP_SESSION, NULL));
+
+ ctx = g_main_context_new ();
+
+ src->session->loop = g_main_loop_new (ctx, FALSE);
+ /* now owned by the loop */
+ g_main_context_unref (ctx);
+
+ src->session->thread = g_thread_try_new ("souphttpsrc-thread",
+ thread_func, src, NULL);
+
+ if (!src->session->thread) {
+ goto err;
}
- GST_OBJECT_UNLOCK (src);
- } else {
- GST_DEBUG_OBJECT (src, "Re-using session");
+
+ source = g_idle_source_new ();
+ g_source_set_callback (source, _session_ready_cb, src, NULL);
+ g_source_attach (source, ctx);
+ g_source_unref (source);
+
+ GST_DEBUG_OBJECT (src, "Waiting for thread to start...");
+ while (!g_main_loop_is_running (src->session->loop))
+ g_cond_wait (&src->session_cond, &src->session_mutex);
+ GST_DEBUG_OBJECT (src, "Soup thread started");
}
+ GST_OBJECT_UNLOCK (src);
+
return TRUE;
+
+err:
+ g_clear_object (&src->session);
+ GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL), ("Failed to create session"));
+ GST_OBJECT_UNLOCK (src);
+
+ return FALSE;
+}
+
+static gboolean
+_session_close_cb (gpointer user_data)
+{
+ GstSoupHTTPSrc *src = user_data;
+
+ if (src->msg) {
+ gst_soup_session_cancel_message (src->session->session, src->msg,
+ src->cancellable);
+ g_clear_object (&src->msg);
+ }
+
+ if (src->session_is_shared)
+ _soup_session_abort (src->session->session);
+
+ /* there may be multiple of this callback attached to the session,
+ * each with different data pointer; disconnect the one we are closing
+ * the session for, leave the others alone
+ */
+ g_signal_handlers_disconnect_by_func (src->session->session,
+ G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src);
+
+ g_mutex_lock (&src->session_mutex);
+ g_clear_object (&src->session);
+ g_cond_signal (&src->session_cond);
+ g_mutex_unlock (&src->session_mutex);
+
+ return FALSE;
}
static void
gst_soup_http_src_session_close (GstSoupHTTPSrc * src)
{
+ GSource *source;
+ GstSoupSession *sess;
+
GST_DEBUG_OBJECT (src, "Closing session");
- g_mutex_lock (&src->mutex);
- if (src->msg) {
- gst_soup_session_cancel_message (src->session, src->msg, src->cancellable);
- g_object_unref (src->msg);
- src->msg = NULL;
+ if (!src->session) {
+ return;
}
- if (src->session) {
- if (!src->session_is_shared)
- _soup_session_abort (src->session);
- g_signal_handlers_disconnect_by_func (src->session,
- G_CALLBACK (gst_soup_http_src_authenticate_cb), src);
- g_object_unref (src->session);
- src->session = NULL;
- }
+ /* ensure _session_close_cb does not deadlock us */
+ sess = g_object_ref (src->session);
+
+ source = g_idle_source_new ();
+
+ g_mutex_lock (&src->session_mutex);
+
+ g_source_set_callback (source, _session_close_cb, src, NULL);
+ g_source_attach (source, g_main_loop_get_context (src->session->loop));
+ g_source_unref (source);
+
+ while (src->session)
+ g_cond_wait (&src->session_cond, &src->session_mutex);
- g_mutex_unlock (&src->mutex);
+ g_mutex_unlock (&src->session_mutex);
+
+ /* finally dispose of our reference from the gst thread */
+ g_object_unref (sess);
}
static void
}
src->got_headers = TRUE;
- g_cond_broadcast (&src->have_headers_cond);
http_headers_event =
gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, http_headers);
return TRUE;
}
-/* Lock taken */
+struct GstSoupSendSrc
+{
+ GstSoupHTTPSrc *src;
+ GError *error;
+};
+
+static void
+_session_send_cb (GObject * source, GAsyncResult * res, gpointer user_data)
+{
+ struct GstSoupSendSrc *msrc = user_data;
+ GstSoupHTTPSrc *src = msrc->src;
+ GError *error = NULL;
+
+ g_mutex_lock (&src->session_mutex);
+
+ src->input_stream = _soup_session_send_finish (src->session->session,
+ res, &error);
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ src->headers_ret = GST_FLOW_FLUSHING;
+ } else {
+ src->headers_ret = gst_soup_http_src_got_headers (src, src->msg);
+ }
+
+ if (!src->input_stream) {
+ GST_DEBUG_OBJECT (src, "Sending message failed: %s", error->message);
+ msrc->error = error;
+ }
+
+ g_cond_broadcast (&src->session_cond);
+ g_mutex_unlock (&src->session_mutex);
+}
+
+static gboolean
+_session_send_idle_cb (gpointer user_data)
+{
+ struct GstSoupSendSrc *msrc = user_data;
+ GstSoupHTTPSrc *src = msrc->src;
+
+ _soup_session_send_async (src->session->session, src->msg, src->cancellable,
+ _session_send_cb, msrc);
+
+ return FALSE;
+}
+
+/* called with session lock taken */
static GstFlowReturn
gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
{
GstFlowReturn ret;
- GError *error = NULL;
+ GSource *source;
+ struct GstSoupSendSrc msrc;
g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
g_assert (src->input_stream == NULL);
- src->input_stream =
- _soup_session_send (src->session, src->msg, src->cancellable, &error);
+ msrc.src = src;
+ msrc.error = NULL;
- if (error)
- GST_DEBUG_OBJECT (src, "Sending message failed: %s", error->message);
+ source = g_idle_source_new ();
- if (g_cancellable_is_cancelled (src->cancellable)) {
- ret = GST_FLOW_FLUSHING;
- goto done;
- }
+ src->headers_ret = GST_FLOW_OK;
+
+ g_source_set_callback (source, _session_send_idle_cb, &msrc, NULL);
+ g_source_attach (source, g_main_loop_get_context (src->session->loop));
+ g_source_unref (source);
+
+ while (!src->input_stream && !msrc.error)
+ g_cond_wait (&src->session_cond, &src->session_mutex);
+
+ ret = src->headers_ret;
- ret = gst_soup_http_src_got_headers (src, src->msg);
if (ret != GST_FLOW_OK) {
goto done;
}
if (!src->input_stream) {
- GST_DEBUG_OBJECT (src, "Didn't get an input stream: %s", error->message);
+ GST_DEBUG_OBJECT (src, "Didn't get an input stream: %s",
+ msrc.error->message);
ret = GST_FLOW_ERROR;
goto done;
}
- if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (src->msg))) {
- GST_DEBUG_OBJECT (src, "Successfully got a reply");
- } else {
- /* FIXME - be more helpful to people debugging */
- ret = GST_FLOW_ERROR;
- }
+ /* if an input stream exists, it was always successful */
+ GST_DEBUG_OBJECT (src, "Successfully got a reply");
done:
- if (error)
- g_error_free (error);
+ g_clear_error (&msrc.error);
return ret;
}
+/* called with session lock taken */
static GstFlowReturn
gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method)
{
}
}
+struct GstSoupReadResult
+{
+ GstSoupHTTPSrc *src;
+ GError *error;
+ void *buffer;
+ gsize bufsize;
+ gssize nbytes;
+};
+
+static void
+_session_read_cb (GObject * source, GAsyncResult * ret, gpointer user_data)
+{
+ struct GstSoupReadResult *res = user_data;
+
+ g_mutex_lock (&res->src->session_mutex);
+
+ res->nbytes = g_input_stream_read_finish (G_INPUT_STREAM (source),
+ ret, &res->error);
+
+ g_cond_signal (&res->src->session_cond);
+ g_mutex_unlock (&res->src->session_mutex);
+}
+
+static gboolean
+_session_read_idle_cb (gpointer user_data)
+{
+ struct GstSoupReadResult *res = user_data;
+
+ g_input_stream_read_async (res->src->input_stream, res->buffer,
+ res->bufsize, G_PRIORITY_DEFAULT, res->src->cancellable,
+ _session_read_cb, res);
+
+ return FALSE;
+}
+
static GstFlowReturn
gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
{
- gssize read_bytes;
+ struct GstSoupReadResult res;
GstMapInfo mapinfo;
GstBaseSrc *bsrc;
GstFlowReturn ret;
+ GSource *source;
bsrc = GST_BASE_SRC_CAST (src);
return GST_FLOW_ERROR;
}
- read_bytes =
- g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
- src->cancellable, NULL);
+ res.src = src;
+ res.buffer = mapinfo.data;
+ res.bufsize = mapinfo.size;
+ res.error = NULL;
+ res.nbytes = -1;
+
+ source = g_idle_source_new ();
+
+ g_mutex_lock (&src->session_mutex);
+
+ g_source_set_callback (source, _session_read_idle_cb, &res, NULL);
+ /* invoke on libsoup thread */
+ g_source_attach (source, g_main_loop_get_context (src->session->loop));
+ g_source_unref (source);
+
+ /* wait for it */
+ while (!res.error && res.nbytes < 0)
+ g_cond_wait (&src->session_cond, &src->session_mutex);
+ g_mutex_unlock (&src->session_mutex);
+
GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
- read_bytes);
+ res.nbytes);
- g_mutex_lock (&src->mutex);
- if (g_cancellable_is_cancelled (src->cancellable)) {
+ if (res.error) {
+ /* retry by default */
+ GstFlowReturn ret = GST_FLOW_CUSTOM_ERROR;
+ if (g_error_matches (res.error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ ret = GST_FLOW_FLUSHING;
+ } else {
+ GST_ERROR_OBJECT (src, "Got error from libsoup: %s", res.error->message);
+ }
+ g_error_free (res.error);
gst_buffer_unmap (*outbuf, &mapinfo);
gst_buffer_unref (*outbuf);
- g_mutex_unlock (&src->mutex);
- return GST_FLOW_FLUSHING;
+ return ret;
}
gst_buffer_unmap (*outbuf, &mapinfo);
- if (read_bytes > 0) {
- gst_buffer_set_size (*outbuf, read_bytes);
+ if (res.nbytes > 0) {
+ gst_buffer_set_size (*outbuf, res.nbytes);
GST_BUFFER_OFFSET (*outbuf) = bsrc->segment.position;
ret = GST_FLOW_OK;
- gst_soup_http_src_update_position (src, read_bytes);
+ gst_soup_http_src_update_position (src, res.nbytes);
/* Got some data, reset retry counter */
src->retry_count = 0;
- gst_soup_http_src_check_update_blocksize (src, read_bytes);
+ gst_soup_http_src_check_update_blocksize (src, res.nbytes);
src->last_socket_read_time = g_get_monotonic_time () * GST_USECOND;
* otherwise we would have to cancel the message and close the connection
*/
if (bsrc->segment.stop != -1
- && bsrc->segment.position + read_bytes >= bsrc->segment.stop) {
+ && bsrc->segment.position + res.nbytes >= bsrc->segment.stop) {
+ SoupMessage *msg = src->msg;
guint8 tmp[128];
- g_object_unref (src->msg);
+ res.buffer = tmp;
+ res.bufsize = sizeof (tmp);
+ res.nbytes = -1;
+
src->msg = NULL;
src->have_body = TRUE;
+ g_mutex_lock (&src->session_mutex);
+
+ source = g_idle_source_new ();
+
+ g_source_set_callback (source, _session_read_idle_cb, &res, NULL);
/* This should return immediately as we're at the end of the range */
- read_bytes =
- g_input_stream_read (src->input_stream, tmp, sizeof (tmp),
- src->cancellable, NULL);
- if (read_bytes > 0)
+ g_source_attach (source, g_main_loop_get_context (src->session->loop));
+ g_source_unref (source);
+
+ while (!res.error && res.nbytes < 0)
+ g_cond_wait (&src->session_cond, &src->session_mutex);
+ g_mutex_unlock (&src->session_mutex);
+
+ g_clear_error (&res.error);
+ g_object_unref (msg);
+
+ if (res.nbytes > 0)
GST_ERROR_OBJECT (src,
- "Read %" G_GSIZE_FORMAT " bytes after end of range", read_bytes);
+ "Read %" G_GSIZE_FORMAT " bytes after end of range", res.nbytes);
}
} else {
gst_buffer_unref (*outbuf);
- if (read_bytes < 0 ||
- (src->have_size && src->read_position < src->content_size)) {
+ if (src->have_size && src->read_position < src->content_size) {
/* Maybe the server disconnected, retry */
ret = GST_FLOW_CUSTOM_ERROR;
} else {
- g_object_unref (src->msg);
+ g_clear_object (&src->msg);
src->msg = NULL;
ret = GST_FLOW_EOS;
src->have_body = TRUE;
}
}
- g_mutex_unlock (&src->mutex);
+
+ g_clear_error (&res.error);
return ret;
}
+static gboolean
+_session_stream_clear_cb (gpointer user_data)
+{
+ GstSoupHTTPSrc *src = user_data;
+
+ g_mutex_lock (&src->session_mutex);
+
+ g_clear_object (&src->input_stream);
+
+ g_cond_signal (&src->session_cond);
+ g_mutex_unlock (&src->session_mutex);
+
+ return FALSE;
+}
+
+static void
+gst_soup_http_src_stream_clear (GstSoupHTTPSrc * src)
+{
+ GSource *source;
+
+ if (!src->input_stream)
+ return;
+
+ g_mutex_lock (&src->session_mutex);
+
+ source = g_idle_source_new ();
+
+ g_source_set_callback (source, _session_stream_clear_cb, src, NULL);
+ g_source_attach (source, g_main_loop_get_context (src->session->loop));
+ g_source_unref (source);
+
+ while (src->input_stream)
+ g_cond_wait (&src->session_cond, &src->session_mutex);
+
+ g_mutex_unlock (&src->session_mutex);
+}
+
static GstFlowReturn
gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
src = GST_SOUP_HTTP_SRC (psrc);
retry:
- g_mutex_lock (&src->mutex);
/* Check for pending position change */
- if (src->request_position != src->read_position) {
- if (src->input_stream) {
- g_input_stream_close (src->input_stream, src->cancellable, NULL);
- g_object_unref (src->input_stream);
- src->input_stream = NULL;
- }
+ if (src->request_position != src->read_position && src->input_stream) {
+ gst_soup_http_src_stream_clear (src);
}
if (g_cancellable_is_cancelled (src->cancellable)) {
ret = GST_FLOW_FLUSHING;
- g_mutex_unlock (&src->mutex);
goto done;
}
/* If we have no open connection to the server, start one */
if (!src->input_stream) {
*outbuf = NULL;
+ g_mutex_lock (&src->session_mutex);
ret =
gst_soup_http_src_do_request (src,
src->method ? src->method : SOUP_METHOD_GET);
http_headers_event = src->http_headers_event;
src->http_headers_event = NULL;
+ g_mutex_unlock (&src->session_mutex);
}
- g_mutex_unlock (&src->mutex);
if (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_ERROR) {
if (http_headers_event) {
if (http_headers_event)
gst_event_unref (http_headers_event);
- g_mutex_lock (&src->mutex);
if (src->input_stream) {
- g_object_unref (src->input_stream);
- src->input_stream = NULL;
+ gst_soup_http_src_stream_clear (src);
}
- g_mutex_unlock (&src->mutex);
if (ret == GST_FLOW_CUSTOM_ERROR) {
ret = GST_FLOW_OK;
goto retry;
}
if (ret == GST_FLOW_FLUSHING) {
- g_mutex_lock (&src->mutex);
src->retry_count = 0;
- g_mutex_unlock (&src->mutex);
}
return ret;
gst_soup_http_src_start (GstBaseSrc * bsrc)
{
GstSoupHTTPSrc *src = GST_SOUP_HTTP_SRC (bsrc);
+ gboolean ret;
GST_DEBUG_OBJECT (src, "start(\"%s\")", src->location);
- return gst_soup_http_src_session_open (src);
+ g_mutex_lock (&src->session_mutex);
+ ret = gst_soup_http_src_session_open (src);
+ g_mutex_unlock (&src->session_mutex);
+ return ret;
}
static gboolean
src = GST_SOUP_HTTP_SRC (bsrc);
GST_DEBUG_OBJECT (src, "stop()");
+
+ gst_soup_http_src_stream_clear (src);
+
if (src->keep_alive && !src->msg && !src->session_is_shared)
- gst_soup_http_src_cancel_message (src);
+ g_cancellable_cancel (src->cancellable);
else
gst_soup_http_src_session_close (src);
const GstStructure *s = gst_context_get_structure (context);
GST_OBJECT_LOCK (src);
- if (src->external_session)
- g_object_unref (src->external_session);
- src->external_session = NULL;
- gst_structure_get (s, "session", _soup_session_get_type (),
+
+ g_clear_object (&src->external_session);
+ gst_structure_get (s, "session", GST_TYPE_SOUP_SESSION,
&src->external_session, NULL);
- src->forced_external_session = FALSE;
- gst_structure_get (s, "force", G_TYPE_BOOLEAN,
- &src->forced_external_session, NULL);
- GST_DEBUG_OBJECT (src, "Setting external session %p (force: %d)",
- src->external_session, src->forced_external_session);
+ GST_DEBUG_OBJECT (src, "Setting external session %p",
+ src->external_session);
GST_OBJECT_UNLOCK (src);
}
src = GST_SOUP_HTTP_SRC (bsrc);
GST_DEBUG_OBJECT (src, "unlock()");
- gst_soup_http_src_cancel_message (src);
+ g_cancellable_cancel (src->cancellable);
return TRUE;
}
* loops.
*/
if (!src->got_headers && GST_STATE (src) >= GST_STATE_PAUSED) {
- g_mutex_lock (&src->mutex);
+ g_mutex_lock (&src->session_mutex);
while (!src->got_headers && !g_cancellable_is_cancelled (src->cancellable)
&& ret == GST_FLOW_OK) {
if ((src->msg && _soup_message_get_method (src->msg) != SOUP_METHOD_HEAD)) {
/* wait for the current request to finish */
- g_cond_wait (&src->have_headers_cond, &src->mutex);
+ g_cond_wait (&src->session_cond, &src->session_mutex);
+ ret = src->headers_ret;
} else {
if (gst_soup_http_src_session_open (src)) {
ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD);
}
}
}
- g_mutex_unlock (&src->mutex);
+ g_mutex_unlock (&src->session_mutex);
}
}