#endif
#include <gst/gstelement.h>
#include <gst/gst-i18n-plugin.h>
+#include <gio/gio.h>
#include <libsoup/soup.h>
#include "gstsouphttpsrc.h"
#include "gstsouputils.h"
-/* libsoup before 2.47.0 was stealing our main context from us,
- * so we can't reliable use it to clean up all pending resources
- * once we're done... let's just continue leaking on old versions.
- * https://bugzilla.gnome.org/show_bug.cgi?id=663944
- */
-#if defined(SOUP_MINOR_VERSION) && SOUP_MINOR_VERSION >= 47
-#define LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT 1
-#endif
-
#include <gst/tag/tag.h>
GST_DEBUG_CATEGORY_STATIC (souphttpsrc_debug);
static gboolean gst_soup_http_src_build_message (GstSoupHTTPSrc * src,
const gchar * method);
static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src);
-static void gst_soup_http_src_queue_message (GstSoupHTTPSrc * src);
static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src,
guint64 offset, guint64 stop_offset);
-static void gst_soup_http_src_session_unpause_message (GstSoupHTTPSrc * src);
-static void gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src);
static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src);
static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src);
static void gst_soup_http_src_parse_status (SoupMessage * msg,
GstSoupHTTPSrc * src);
-static void gst_soup_http_src_chunk_free (gpointer gstbuf);
-static SoupBuffer *gst_soup_http_src_chunk_allocator (SoupMessage * msg,
- gsize max_len, gpointer user_data);
-static void gst_soup_http_src_got_chunk_cb (SoupMessage * msg,
- SoupBuffer * chunk, GstSoupHTTPSrc * src);
-static void gst_soup_http_src_response_cb (SoupSession * session,
- SoupMessage * msg, GstSoupHTTPSrc * src);
-static void gst_soup_http_src_got_headers_cb (SoupMessage * msg,
- GstSoupHTTPSrc * src);
-static void gst_soup_http_src_got_body_cb (SoupMessage * msg,
- GstSoupHTTPSrc * src);
-static void gst_soup_http_src_finished_cb (SoupMessage * msg,
- GstSoupHTTPSrc * src);
+static void gst_soup_http_src_got_headers (GstSoupHTTPSrc * src,
+ SoupMessage * msg);
static void gst_soup_http_src_authenticate_cb (SoupSession * session,
SoupMessage * msg, SoupAuth * auth, gboolean retrying,
GstSoupHTTPSrc * src);
+static void gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src);
#define gst_soup_http_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC,
static void
gst_soup_http_src_reset (GstSoupHTTPSrc * src)
{
- src->interrupted = FALSE;
- src->retry = FALSE;
src->retry_count = 0;
src->have_size = FALSE;
src->got_headers = FALSE;
src->have_body = FALSE;
src->ret = GST_FLOW_OK;
+ g_cancellable_reset (src->cancellable);
+ gst_soup_http_src_destroy_input_stream (src);
gst_caps_replace (&src->src_caps, NULL);
g_free (src->iradio_name);
const gchar *proxy;
g_mutex_init (&src->mutex);
- g_cond_init (&src->request_finished_cond);
+ g_cond_init (&src->have_headers_cond);
+ src->cancellable = g_cancellable_new ();
+ src->poll_context = g_main_context_new ();
src->location = NULL;
src->redirection_uri = NULL;
src->automatic_redirect = TRUE;
src->proxy_pw = NULL;
src->cookies = NULL;
src->iradio_mode = DEFAULT_IRADIO_MODE;
- src->loop = NULL;
- src->context = NULL;
src->session = NULL;
src->msg = NULL;
src->timeout = DEFAULT_TIMEOUT;
GST_DEBUG_OBJECT (src, "finalize");
g_mutex_clear (&src->mutex);
- g_cond_clear (&src->request_finished_cond);
+ g_cond_clear (&src->have_headers_cond);
+ g_object_unref (src->cancellable);
+ g_main_context_unref (src->poll_context);
g_free (src->location);
g_free (src->redirection_uri);
g_free (src->user_agent);
}
static void
-gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
+gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src)
{
- if (src->msg != NULL) {
- GST_INFO_OBJECT (src, "Cancelling message");
- src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED;
- soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
+ if (src->input_stream) {
+ if (src->poll_source) {
+ g_source_destroy (src->poll_source);
+ g_source_unref (src->poll_source);
+ src->poll_source = NULL;
+ }
+ g_input_stream_close (src->input_stream, src->cancellable, NULL);
+ g_object_unref (src->input_stream);
+ src->input_stream = NULL;
}
- src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
- src->msg = NULL;
}
static void
-gst_soup_http_src_queue_message (GstSoupHTTPSrc * src)
+gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
{
- soup_session_queue_message (src->session, src->msg,
- (SoupSessionCallback) gst_soup_http_src_response_cb, src);
- src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED;
+ g_cancellable_cancel (src->cancellable);
+ g_cond_signal (&src->have_headers_cond);
}
static gboolean
return gst_structure_foreach (src->extra_headers, _append_extra_headers, src);
}
-
-static void
-gst_soup_http_src_session_unpause_message (GstSoupHTTPSrc * src)
-{
- soup_session_unpause_message (src->session, src->msg);
-}
-
-static void
-gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src)
-{
- soup_session_pause_message (src->session, src->msg);
-}
-
static gboolean
gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
{
return FALSE;
}
- if (!src->context)
- src->context = g_main_context_new ();
-
- if (!src->loop)
- src->loop = g_main_loop_new (src->context, TRUE);
- if (!src->loop) {
- GST_ELEMENT_ERROR (src, LIBRARY, INIT,
- (NULL), ("Failed to start GMainLoop"));
- g_main_context_unref (src->context);
- return FALSE;
- }
-
if (!src->session) {
GST_DEBUG_OBJECT (src, "Creating session");
if (src->proxy == NULL) {
src->session =
- soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
- src->context, SOUP_SESSION_USER_AGENT, src->user_agent,
- SOUP_SESSION_TIMEOUT, src->timeout,
+ soup_session_new_with_options (SOUP_SESSION_USER_AGENT,
+ src->user_agent, SOUP_SESSION_TIMEOUT, src->timeout,
SOUP_SESSION_SSL_STRICT, src->ssl_strict,
- SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_PROXY_RESOLVER_DEFAULT,
SOUP_SESSION_TLS_INTERACTION, src->tls_interaction, NULL);
} else {
src->session =
- soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
- src->context, SOUP_SESSION_PROXY_URI, src->proxy,
+ soup_session_new_with_options (SOUP_SESSION_PROXY_URI, src->proxy,
SOUP_SESSION_TIMEOUT, src->timeout,
SOUP_SESSION_SSL_STRICT, src->ssl_strict,
SOUP_SESSION_USER_AGENT, src->user_agent,
return TRUE;
}
-#ifdef LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT
-static gboolean
-dummy_idle_cb (gpointer data)
-{
- return FALSE /* Idle source is removed */ ;
-}
-#endif
-
static void
gst_soup_http_src_session_close (GstSoupHTTPSrc * src)
{
GST_DEBUG_OBJECT (src, "Closing session");
- if (src->loop)
- g_main_loop_quit (src->loop);
-
g_mutex_lock (&src->mutex);
if (src->session) {
soup_session_abort (src->session); /* This unrefs the message. */
src->session = NULL;
src->msg = NULL;
}
- if (src->loop) {
-#ifdef LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT
- GSource *idle_source;
-
- /* Iterating the main context to give GIO cancellables a chance
- * to initiate cleanups. Wihout this, resources allocated by
- * libsoup for the connection are not released and socket fd is
- * leaked. */
- idle_source = g_idle_source_new ();
- /* Suppressing "idle souce without callback" warning */
- g_source_set_callback (idle_source, dummy_idle_cb, NULL, NULL);
- g_source_set_priority (idle_source, G_PRIORITY_LOW);
- g_source_attach (idle_source, src->context);
- /* Acquiring the context. Idle source guarantees that we'll not block. */
- g_main_context_push_thread_default (src->context);
- g_main_context_iteration (src->context, TRUE);
- /* Ensuring that there's no unhandled pending events left. */
- while (g_main_context_iteration (src->context, FALSE));
- g_main_context_pop_thread_default (src->context);
- g_source_unref (idle_source);
-#endif
-
- g_main_loop_unref (src->loop);
- g_main_context_unref (src->context);
- src->loop = NULL;
- src->context = NULL;
- }
g_mutex_unlock (&src->mutex);
}
}
static void
-gst_soup_http_src_got_headers_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
+gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
{
const char *value;
GstTagList *tag_list;
return;
}
- if (msg->status_code == SOUP_STATUS_UNAUTHORIZED)
+ if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
+ /* force an error */
+ gst_soup_http_src_parse_status (msg, src);
return;
+ }
- src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING;
src->got_headers = TRUE;
+ g_cond_broadcast (&src->have_headers_cond);
http_headers = gst_structure_new_empty ("http-headers");
gst_structure_set (http_headers, "uri", G_TYPE_STRING, src->location, NULL);
if (param != NULL)
rate = atol (param);
- src->src_caps = gst_caps_new_simple ("audio/x-raw",
+ src->src_caps = gst_caps_new_simple ("audio/x-unaligned-raw",
"format", G_TYPE_STRING, "S16BE",
"layout", G_TYPE_STRING, "interleaved",
"channels", G_TYPE_INT, channels, "rate", G_TYPE_INT, rate, NULL);
* GST_FLOW_ERROR from the create function instead of having
* got_chunk_cb overwrite src->ret with FLOW_OK again. */
if (src->ret == GST_FLOW_ERROR || src->ret == GST_FLOW_EOS) {
- gst_soup_http_src_session_pause_message (src);
-
- if (src->loop)
- g_main_loop_quit (src->loop);
- }
- g_cond_signal (&src->request_finished_cond);
-}
-
-/* Have body. Signal EOS. */
-static void
-gst_soup_http_src_got_body_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
-{
- if (G_UNLIKELY (msg != src->msg)) {
- GST_DEBUG_OBJECT (src, "got body, but not for current message");
- return;
- }
- if (G_UNLIKELY (src->session_io_status !=
- GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
- /* Probably a redirect. */
- return;
}
- GST_DEBUG_OBJECT (src, "got body");
- src->ret = GST_FLOW_EOS;
- src->have_body = TRUE;
-
- /* no need to interrupt the message here, we do it on the
- * finished_cb anyway if needed. And getting the body might mean
- * that the connection was hang up before finished. This happens when
- * the pipeline is stalled for too long (long pauses during playback).
- * Best to let it continue from here and pause because it reached the
- * final bytes based on content_size or received an out of range error */
-}
-
-/* Finished. Signal EOS. */
-static void
-gst_soup_http_src_finished_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
-{
- if (G_UNLIKELY (msg != src->msg)) {
- GST_DEBUG_OBJECT (src, "finished, but not for current message");
- return;
- }
- GST_INFO_OBJECT (src, "finished, io status: %d", src->session_io_status);
- src->ret = GST_FLOW_EOS;
- if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED) {
- /* gst_soup_http_src_cancel_message() triggered this; probably a seek
- * that occurred in the QUEUEING state; i.e. before the connection setup
- * was complete. Do nothing */
- GST_DEBUG_OBJECT (src, "cancelled");
- } else if (src->session_io_status ==
- GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING && src->read_position > 0 &&
- (src->have_size && src->read_position < src->content_size) &&
- (src->max_retries == -1 || src->retry_count < src->max_retries)) {
- /* The server disconnected while streaming. Reconnect and seeking to the
- * last location. */
- src->retry = TRUE;
- src->retry_count++;
- src->ret = GST_FLOW_CUSTOM_ERROR;
- } else if (G_UNLIKELY (src->session_io_status !=
- GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
- if (msg->method == SOUP_METHOD_HEAD) {
- GST_DEBUG_OBJECT (src, "Ignoring error %d:%s during HEAD request",
- msg->status_code, msg->reason_phrase);
- } else {
- gst_soup_http_src_parse_status (msg, src);
- }
- }
- if (src->loop)
- g_main_loop_quit (src->loop);
- g_cond_signal (&src->request_finished_cond);
-}
-
-/* Buffer lifecycle management.
- *
- * gst_soup_http_src_create() runs the GMainLoop for this element, to let
- * Soup take control.
- * A GstBuffer is allocated in gst_soup_http_src_chunk_allocator() and
- * associated with a SoupBuffer.
- * Soup reads HTTP data in the GstBuffer's data buffer.
- * The gst_soup_http_src_got_chunk_cb() is then called with the SoupBuffer.
- * That sets gst_soup_http_src_create()'s return argument to the GstBuffer,
- * increments its refcount (to 2), pauses the flow of data from the HTTP
- * source to prevent gst_soup_http_src_got_chunk_cb() from being called
- * again and breaks out of the GMainLoop.
- * Because the SOUP_MESSAGE_OVERWRITE_CHUNKS flag is set, Soup frees the
- * SoupBuffer and calls gst_soup_http_src_chunk_free(), which decrements the
- * refcount (to 1).
- * gst_soup_http_src_create() returns the GstBuffer. It will be freed by a
- * downstream element.
- * If Soup fails to read HTTP data, it does not call
- * gst_soup_http_src_got_chunk_cb(), but still frees the SoupBuffer and
- * calls gst_soup_http_src_chunk_free(), which decrements the GstBuffer's
- * refcount to 0, freeing it.
- */
-
-typedef struct
-{
- GstBuffer *buffer;
- GstMapInfo map;
-} SoupGstChunk;
-
-static void
-gst_soup_http_src_chunk_free (gpointer user_data)
-{
- SoupGstChunk *chunk = (SoupGstChunk *) user_data;
-
- gst_buffer_unmap (chunk->buffer, &chunk->map);
- gst_buffer_unref (chunk->buffer);
- g_slice_free (SoupGstChunk, chunk);
}
-static SoupBuffer *
-gst_soup_http_src_chunk_allocator (SoupMessage * msg, gsize max_len,
- gpointer user_data)
+static GstBuffer *
+gst_soup_http_src_alloc_buffer (GstSoupHTTPSrc * src)
{
- GstSoupHTTPSrc *src = (GstSoupHTTPSrc *) user_data;
GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
- GstBuffer *gstbuf;
- SoupBuffer *soupbuf;
- gsize length;
GstFlowReturn rc;
- SoupGstChunk *chunk;
-
- if (max_len)
- length = MIN (basesrc->blocksize, max_len);
- else
- length = basesrc->blocksize;
- GST_DEBUG_OBJECT (src, "alloc %" G_GSIZE_FORMAT " bytes <= %" G_GSIZE_FORMAT,
- length, max_len);
+ GstBuffer *gstbuf;
- rc = GST_BASE_SRC_CLASS (parent_class)->alloc (basesrc, -1, length, &gstbuf);
+ rc = GST_BASE_SRC_CLASS (parent_class)->alloc (basesrc, -1,
+ basesrc->blocksize, &gstbuf);
if (G_UNLIKELY (rc != GST_FLOW_OK)) {
- /* Failed to allocate buffer. Stall SoupSession and return error code
- * to create(). */
- src->ret = rc;
- g_main_loop_quit (src->loop);
return NULL;
}
- chunk = g_slice_new0 (SoupGstChunk);
- chunk->buffer = gstbuf;
- gst_buffer_map (gstbuf, &chunk->map, GST_MAP_READWRITE);
-
- soupbuf = soup_buffer_new_with_owner (chunk->map.data, chunk->map.size,
- chunk, gst_soup_http_src_chunk_free);
-
- return soupbuf;
-}
-
-static void
-gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
- GstSoupHTTPSrc * src)
-{
- GstBaseSrc *basesrc;
- guint64 new_position;
- SoupGstChunk *gchunk;
-
- if (G_UNLIKELY (msg != src->msg)) {
- GST_DEBUG_OBJECT (src, "got chunk, but not for current message");
- return;
- }
- if (G_UNLIKELY (!src->outbuf)) {
- GST_DEBUG_OBJECT (src, "got chunk but we're not expecting one");
- src->ret = GST_FLOW_OK;
- gst_soup_http_src_cancel_message (src);
- g_main_loop_quit (src->loop);
- return;
- }
-
- /* We got data, reset the retry counter */
- src->retry_count = 0;
-
- src->have_body = FALSE;
- if (G_UNLIKELY (src->session_io_status !=
- GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
- /* Probably a redirect. */
- return;
- }
- basesrc = GST_BASE_SRC_CAST (src);
- GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes",
- chunk->length);
-
- /* Extract the GstBuffer from the SoupBuffer and set its fields. */
- gchunk = (SoupGstChunk *) soup_buffer_get_owner (chunk);
- *src->outbuf = gchunk->buffer;
-
- gst_buffer_resize (*src->outbuf, 0, chunk->length);
- GST_BUFFER_OFFSET (*src->outbuf) = basesrc->segment.position;
-
- gst_buffer_ref (*src->outbuf);
-
- new_position = src->read_position + chunk->length;
- if (G_LIKELY (src->request_position == src->read_position))
- src->request_position = new_position;
- src->read_position = new_position;
-
- if (src->have_size) {
- if (new_position > src->content_size) {
- GST_DEBUG_OBJECT (src, "Got position previous estimated content size "
- "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", new_position,
- src->content_size);
- src->content_size = new_position;
- basesrc->segment.duration = src->content_size;
- gst_element_post_message (GST_ELEMENT (src),
- gst_message_new_duration_changed (GST_OBJECT (src)));
- } else if (new_position == src->content_size) {
- GST_DEBUG_OBJECT (src, "We're EOS now");
- }
- }
-
- src->ret = GST_FLOW_OK;
- g_main_loop_quit (src->loop);
- gst_soup_http_src_session_pause_message (src);
-}
-
-static void
-gst_soup_http_src_response_cb (SoupSession * session, SoupMessage * msg,
- GstSoupHTTPSrc * src)
-{
- if (G_UNLIKELY (msg != src->msg)) {
- GST_DEBUG_OBJECT (src, "got response %d: %s, but not for current message",
- msg->status_code, msg->reason_phrase);
- return;
- }
- if (G_UNLIKELY (src->session_io_status !=
- GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)
- && SOUP_STATUS_IS_REDIRECTION (msg->status_code)) {
- /* Ignore redirections. */
- return;
- }
- GST_INFO_OBJECT (src, "got response %d: %s", msg->status_code,
- msg->reason_phrase);
- if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING &&
- src->read_position > 0 && (src->have_size
- && src->read_position < src->content_size) &&
- (src->max_retries == -1 || src->retry_count < src->max_retries)) {
- /* The server disconnected while streaming. Reconnect and seeking to the
- * last location. */
- src->retry = TRUE;
- src->retry_count++;
- } else {
- gst_soup_http_src_parse_status (msg, src);
- }
- /* The session's SoupMessage object expires after this callback returns. */
- src->msg = NULL;
- g_main_loop_quit (src->loop);
+ return gstbuf;
}
#define SOUP_HTTP_SRC_ERROR(src,soup_msg,cat,code,error_message) \
break;
case SOUP_STATUS_IO_ERROR:
if (src->max_retries == -1 || src->retry_count < src->max_retries) {
- src->retry = TRUE;
- src->retry_count++;
src->ret = GST_FLOW_CUSTOM_ERROR;
} else {
SOUP_HTTP_SRC_ERROR (src, msg, RESOURCE, READ,
("Error parsing URL."), ("URL: %s", src->location));
return FALSE;
}
- src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
if (!src->keep_alive) {
soup_message_headers_append (src->msg->request_headers, "Connection",
"close");
*cookie);
}
}
- src->retry = FALSE;
- g_signal_connect (src->msg, "got_headers",
- G_CALLBACK (gst_soup_http_src_got_headers_cb), src);
- g_signal_connect (src->msg, "got_body",
- G_CALLBACK (gst_soup_http_src_got_body_cb), src);
- g_signal_connect (src->msg, "finished",
- G_CALLBACK (gst_soup_http_src_finished_cb), src);
- g_signal_connect (src->msg, "got_chunk",
- G_CALLBACK (gst_soup_http_src_got_chunk_cb), src);
soup_message_set_flags (src->msg, SOUP_MESSAGE_OVERWRITE_CHUNKS |
(src->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
- soup_message_set_chunk_allocator (src->msg,
- gst_soup_http_src_chunk_allocator, src, NULL);
gst_soup_http_src_add_range_header (src, src->request_position,
src->stop_position);
return TRUE;
}
+static void
+gst_soup_http_src_check_input_stream_interfaces (GstSoupHTTPSrc * src)
+{
+ if (!src->input_stream)
+ return;
+
+ src->has_pollable_interface = G_IS_POLLABLE_INPUT_STREAM (src->input_stream)
+ && g_pollable_input_stream_can_poll ((GPollableInputStream *)
+ src->input_stream);
+}
+
static GstFlowReturn
-gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method,
- GstBuffer ** outbuf)
+gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
{
- /* If we're not OK, just go out of here */
+ g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
+
+ g_assert (src->input_stream == NULL);
+ g_assert (src->poll_source == NULL);
+
+ /* FIXME We are ignoring the GError here, might be useful to debug */
+ src->input_stream =
+ soup_session_send (src->session, src->msg, src->cancellable, NULL);
+
+ if (g_cancellable_is_cancelled (src->cancellable))
+ return GST_FLOW_FLUSHING;
+
+ gst_soup_http_src_got_headers (src, src->msg);
if (src->ret != GST_FLOW_OK) {
- GST_DEBUG_OBJECT (src, "Previous flow return not OK: %s",
- gst_flow_get_name (src->ret));
return src->ret;
}
+ if (!src->input_stream) {
+ GST_DEBUG_OBJECT (src, "Didn't get an input stream");
+ return GST_FLOW_ERROR;
+ }
+
+ if (SOUP_STATUS_IS_SUCCESSFUL (src->msg->status_code)) {
+ GST_DEBUG_OBJECT (src, "Successfully got a reply");
+ } else {
+ /* FIXME - be more helpful to people debugging */
+ return GST_FLOW_ERROR;
+ }
+
+ gst_soup_http_src_check_input_stream_interfaces (src);
+
+ return GST_FLOW_OK;
+}
+
+static GstFlowReturn
+gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method)
+{
+ if (src->max_retries != -1 && src->retry_count > src->max_retries) {
+ GST_DEBUG_OBJECT (src, "Max retries reached");
+ src->ret = GST_FLOW_ERROR;
+ return src->ret;
+ }
+
+ src->retry_count++;
+ /* EOS immediately if we have an empty segment */
+ if (src->request_position == src->stop_position)
+ return GST_FLOW_EOS;
+
GST_LOG_OBJECT (src, "Running request for method: %s", method);
+
+ /* Update the position if we are retrying */
if (src->msg && (src->request_position != src->read_position)) {
- if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE) {
- /* EOS immediately if we have an empty segment */
- if (src->request_position == src->stop_position)
- return GST_FLOW_EOS;
+ gst_soup_http_src_add_range_header (src, src->request_position,
+ src->stop_position);
+ }
- gst_soup_http_src_add_range_header (src, src->request_position,
- src->stop_position);
- } else {
- GST_DEBUG_OBJECT (src, "Seek from position %" G_GUINT64_FORMAT
- " to %" G_GUINT64_FORMAT ": requeueing connection request",
- src->read_position, src->request_position);
- gst_soup_http_src_cancel_message (src);
+ if (!src->msg) {
+ if (!gst_soup_http_src_build_message (src, method)) {
+ return GST_FLOW_ERROR;
}
}
- if (!src->msg) {
- /* EOS immediately if we have an empty segment */
- if (src->request_position == src->stop_position)
- return GST_FLOW_EOS;
- if (!gst_soup_http_src_build_message (src, method))
- return GST_FLOW_ERROR;
+ if (g_cancellable_is_cancelled (src->cancellable)) {
+ GST_INFO_OBJECT (src, "interrupted");
+ src->ret = GST_FLOW_FLUSHING;
+ goto done;
}
+ src->ret = gst_soup_http_src_send_message (src);
- src->ret = GST_FLOW_CUSTOM_ERROR;
- src->outbuf = outbuf;
- do {
- if (src->interrupted) {
- GST_INFO_OBJECT (src, "interrupted");
- src->ret = GST_FLOW_FLUSHING;
- break;
- }
- if (src->retry) {
- GST_INFO_OBJECT (src, "Reconnecting");
+done:
+ return src->ret;
+}
- /* EOS immediately if we have an empty segment */
- if (src->request_position == src->stop_position)
- return GST_FLOW_EOS;
+static void
+gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
+{
+ GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
+ guint64 new_position;
- if (!gst_soup_http_src_build_message (src, method))
- return GST_FLOW_ERROR;
- src->retry = FALSE;
- continue;
- }
- if (!src->msg) {
- GST_DEBUG_OBJECT (src, "EOS reached");
- break;
- }
+ new_position = src->read_position + bytes_read;
+ if (G_LIKELY (src->request_position == src->read_position))
+ src->request_position = new_position;
+ src->read_position = new_position;
- switch (src->session_io_status) {
- case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE:
- GST_INFO_OBJECT (src, "Queueing connection request");
- gst_soup_http_src_queue_message (src);
- break;
- case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED:
- break;
- case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING:
- gst_soup_http_src_session_unpause_message (src);
- break;
- case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED:
- /* Impossible. */
- break;
+ if (src->have_size) {
+ if (new_position > src->content_size) {
+ GST_DEBUG_OBJECT (src, "Got position previous estimated content size "
+ "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", new_position,
+ src->content_size);
+ src->content_size = new_position;
+ basesrc->segment.duration = src->content_size;
+ gst_element_post_message (GST_ELEMENT (src),
+ gst_message_new_duration_changed (GST_OBJECT (src)));
+ } else if (new_position == src->content_size) {
+ GST_DEBUG_OBJECT (src, "We're EOS now");
}
+ }
+}
- if (src->ret == GST_FLOW_CUSTOM_ERROR) {
- g_main_context_push_thread_default (src->context);
- g_main_loop_run (src->loop);
- g_main_context_pop_thread_default (src->context);
- }
+static gboolean
+_gst_soup_http_src_data_available_callback (GObject * pollable_stream,
+ gpointer udata)
+{
+ GstSoupHTTPSrc *src = udata;
- } while (src->ret == GST_FLOW_CUSTOM_ERROR);
+ src->have_data = TRUE;
+ return TRUE;
+}
- /* Let the request finish if we had a stop position and are there */
- if (src->ret == GST_FLOW_OK && src->stop_position != -1
- && src->read_position >= src->stop_position) {
- src->outbuf = NULL;
- gst_soup_http_src_session_unpause_message (src);
- g_main_context_push_thread_default (src->context);
- g_main_loop_run (src->loop);
- g_main_context_pop_thread_default (src->context);
+/* Need to wait on a gsource to know when data is available */
+static gboolean
+gst_soup_http_src_wait_for_data (GstSoupHTTPSrc * src)
+{
+ src->have_data = FALSE;
- g_cond_signal (&src->request_finished_cond);
- /* Return OK unconditionally here, src->ret will
- * be most likely be EOS now but we want to
- * consume the buffer we got above */
- return GST_FLOW_OK;
+ if (!src->poll_source) {
+ src->poll_source =
+ g_pollable_input_stream_create_source ((GPollableInputStream *)
+ src->input_stream, src->cancellable);
+ g_source_set_callback (src->poll_source,
+ (GSourceFunc) _gst_soup_http_src_data_available_callback, src, NULL);
+ g_source_attach (src->poll_source, src->poll_context);
}
- if (src->ret == GST_FLOW_CUSTOM_ERROR)
- src->ret = GST_FLOW_EOS;
- g_cond_signal (&src->request_finished_cond);
+ while (!src->have_data && !g_cancellable_is_cancelled (src->cancellable)) {
+ g_main_context_iteration (src->poll_context, TRUE);
+ }
- /* basesrc assumes that we don't return a buffer if
- * something else than OK is returned. It will just
- * leak any buffer we might accidentially provide
- * here.
- *
- * This can potentially happen during flushing.
- */
- if (src->ret != GST_FLOW_OK && outbuf && *outbuf) {
+ return src->have_data;
+}
+
+static GstFlowReturn
+gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
+{
+ gssize read_bytes;
+ GstMapInfo mapinfo;
+ GstBaseSrc *bsrc;
+ GstFlowReturn ret;
+ GError *err = NULL;
+
+ bsrc = GST_BASE_SRC_CAST (src);
+
+ *outbuf = gst_soup_http_src_alloc_buffer (src);
+ if (!*outbuf) {
+ GST_WARNING_OBJECT (src, "Failed to allocate buffer");
+ return GST_FLOW_ERROR;
+ }
+
+ if (!gst_buffer_map (*outbuf, &mapinfo, GST_MAP_WRITE)) {
+ GST_WARNING_OBJECT (src, "Failed to map buffer");
+ return GST_FLOW_ERROR;
+ }
+
+ if (src->has_pollable_interface) {
+ while (1) {
+ read_bytes =
+ g_pollable_input_stream_read_nonblocking ((GPollableInputStream *)
+ src->input_stream, mapinfo.data, mapinfo.size, src->cancellable,
+ &err);
+ if (read_bytes == -1) {
+ if (err && g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_error_free (err);
+ err = NULL;
+
+ /* no data yet, wait */
+ if (gst_soup_http_src_wait_for_data (src))
+ /* retry */
+ continue;
+ }
+ }
+ break;
+ }
+ } else {
+ read_bytes =
+ g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
+ src->cancellable, NULL);
+ }
+
+ if (err)
+ g_error_free (err);
+
+ GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
+ read_bytes);
+
+ g_mutex_lock (&src->mutex);
+ if (g_cancellable_is_cancelled (src->cancellable)) {
+ gst_buffer_unmap (*outbuf, &mapinfo);
gst_buffer_unref (*outbuf);
- *outbuf = NULL;
+ g_mutex_unlock (&src->mutex);
+ return GST_FLOW_FLUSHING;
}
+ g_mutex_unlock (&src->mutex);
- return src->ret;
+ gst_buffer_unmap (*outbuf, &mapinfo);
+ if (read_bytes > 0) {
+ gst_buffer_set_size (*outbuf, read_bytes);
+ GST_BUFFER_OFFSET (*outbuf) = bsrc->segment.position;
+ ret = GST_FLOW_OK;
+ gst_soup_http_src_update_position (src, read_bytes);
+
+ /* Got some data, reset retry counter */
+ src->retry_count = 0;
+ } else {
+ gst_buffer_unref (*outbuf);
+ if (read_bytes < 0) {
+ /* Maybe the server disconnected, retry */
+ ret = GST_FLOW_CUSTOM_ERROR;
+ } else {
+ ret = GST_FLOW_EOS;
+ src->have_body = TRUE;
+ }
+ }
+ return ret;
}
static GstFlowReturn
gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstSoupHTTPSrc *src;
- GstFlowReturn ret;
- GstEvent *http_headers_event;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstEvent *http_headers_event = NULL;
src = GST_SOUP_HTTP_SRC (psrc);
+retry:
g_mutex_lock (&src->mutex);
- *outbuf = NULL;
- ret =
- gst_soup_http_src_do_request (src,
- src->method ? src->method : SOUP_METHOD_GET, outbuf);
- http_headers_event = src->http_headers_event;
- src->http_headers_event = NULL;
+
+ /* Check for pending position change */
+ if (src->request_position != src->read_position) {
+ gst_soup_http_src_destroy_input_stream (src);
+ }
+
+ if (g_cancellable_is_cancelled (src->cancellable)) {
+ ret = GST_FLOW_FLUSHING;
+ g_mutex_unlock (&src->mutex);
+ goto done;
+ }
+
+ /* If we have no open connection to the server, start one */
+ if (!src->input_stream) {
+ *outbuf = NULL;
+ ret =
+ gst_soup_http_src_do_request (src,
+ src->method ? src->method : SOUP_METHOD_GET);
+ http_headers_event = src->http_headers_event;
+ src->http_headers_event = NULL;
+ }
g_mutex_unlock (&src->mutex);
- if (http_headers_event)
- gst_pad_push_event (GST_BASE_SRC_PAD (src), http_headers_event);
+ if (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_ERROR) {
+ if (http_headers_event) {
+ gst_pad_push_event (GST_BASE_SRC_PAD (src), http_headers_event);
+ http_headers_event = NULL;
+ }
+ }
+ if (ret == GST_FLOW_OK)
+ ret = gst_soup_http_src_read_buffer (src, outbuf);
+
+done:
+ GST_DEBUG_OBJECT (src, "Returning %d %s", ret, gst_flow_get_name (ret));
+ if (ret != GST_FLOW_OK) {
+ if (http_headers_event)
+ gst_event_unref (http_headers_event);
+
+ g_mutex_lock (&src->mutex);
+ gst_soup_http_src_destroy_input_stream (src);
+ g_mutex_unlock (&src->mutex);
+ if (ret == GST_FLOW_CUSTOM_ERROR)
+ goto retry;
+ }
return ret;
}
src = GST_SOUP_HTTP_SRC (bsrc);
GST_DEBUG_OBJECT (src, "unlock()");
- src->interrupted = TRUE;
src->ret = GST_FLOW_FLUSHING;
- if (src->loop)
- g_main_loop_quit (src->loop);
- g_cond_signal (&src->request_finished_cond);
+ gst_soup_http_src_cancel_message (src);
return TRUE;
}
src = GST_SOUP_HTTP_SRC (bsrc);
GST_DEBUG_OBJECT (src, "unlock_stop()");
- src->interrupted = FALSE;
src->ret = GST_FLOW_OK;
+ g_cancellable_reset (src->cancellable);
return TRUE;
}
*/
if (!src->got_headers && GST_STATE (src) >= GST_STATE_PAUSED) {
g_mutex_lock (&src->mutex);
- while (!src->got_headers && !src->interrupted && ret == GST_FLOW_OK) {
- if ((src->msg && src->msg->method != SOUP_METHOD_HEAD) &&
- src->session_io_status != GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE) {
+ while (!src->got_headers && !g_cancellable_is_cancelled (src->cancellable)
+ && ret == GST_FLOW_OK) {
+ if ((src->msg && src->msg->method != SOUP_METHOD_HEAD)) {
/* wait for the current request to finish */
- g_cond_wait (&src->request_finished_cond, &src->mutex);
+ g_cond_wait (&src->have_headers_cond, &src->mutex);
} else {
if (gst_soup_http_src_session_open (src)) {
- ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD, NULL);
+ ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD);
}
}
}
/* A HEAD request shouldn't lead to EOS */
src->ret = GST_FLOW_OK;
}
- /* resets status to idle */
- gst_soup_http_src_cancel_message (src);
g_mutex_unlock (&src->mutex);
}
}