#endif
#include <gst/gstelement.h>
#include <gst/gst-i18n-plugin.h>
-#include <gio/gio.h>
#include <libsoup/soup.h>
#include "gstsouphttpsrc.h"
#include "gstsouputils.h"
#define DEFAULT_RETRIES 3
#define DEFAULT_SOUP_METHOD NULL
+#define GROW_BLOCKSIZE_LIMIT 1
+#define GROW_BLOCKSIZE_COUNT 1
+#define GROW_BLOCKSIZE_FACTOR 2
+#define REDUCE_BLOCKSIZE_LIMIT 0.20
+#define REDUCE_BLOCKSIZE_COUNT 2
+#define REDUCE_BLOCKSIZE_FACTOR 0.5
+
static void gst_soup_http_src_uri_handler_init (gpointer g_iface,
gpointer iface_data);
static void gst_soup_http_src_finalize (GObject * gobject);
static void gst_soup_http_src_authenticate_cb (SoupSession * session,
SoupMessage * msg, SoupAuth * auth, gboolean retrying,
GstSoupHTTPSrc * src);
-static void gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src);
#define gst_soup_http_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC,
src->content_size = 0;
src->have_body = FALSE;
+ src->reduce_blocksize_count = 0;
+ src->increase_blocksize_count = 0;
+
src->ret = GST_FLOW_OK;
g_cancellable_reset (src->cancellable);
- gst_soup_http_src_destroy_input_stream (src);
+ if (src->input_stream) {
+ g_object_unref (src->input_stream);
+ src->input_stream = NULL;
+ }
gst_caps_replace (&src->src_caps, NULL);
g_free (src->iradio_name);
g_mutex_init (&src->mutex);
g_cond_init (&src->have_headers_cond);
src->cancellable = g_cancellable_new ();
- src->poll_context = g_main_context_new ();
src->location = NULL;
src->redirection_uri = NULL;
src->automatic_redirect = TRUE;
src->tls_interaction = DEFAULT_TLS_INTERACTION;
src->max_retries = DEFAULT_RETRIES;
src->method = DEFAULT_SOUP_METHOD;
+ src->minimum_blocksize = gst_base_src_get_blocksize (GST_BASE_SRC_CAST (src));
proxy = g_getenv ("http_proxy");
if (!gst_soup_http_src_set_proxy (src, proxy)) {
GST_WARNING_OBJECT (src,
g_mutex_clear (&src->mutex);
g_cond_clear (&src->have_headers_cond);
g_object_unref (src->cancellable);
- g_main_context_unref (src->poll_context);
g_free (src->location);
g_free (src->redirection_uri);
g_free (src->user_agent);
}
static void
-gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src)
-{
- if (src->input_stream) {
- if (src->poll_source) {
- g_source_destroy (src->poll_source);
- g_source_unref (src->poll_source);
- src->poll_source = NULL;
- }
- g_input_stream_close (src->input_stream, src->cancellable, NULL);
- g_object_unref (src->input_stream);
- src->input_stream = NULL;
- }
-}
-
-static void
gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
{
g_cancellable_cancel (src->cancellable);
GST_DEBUG_OBJECT (src, "Closing session");
g_mutex_lock (&src->mutex);
+ if (src->msg) {
+ soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
+ g_object_unref (src->msg);
+ src->msg = NULL;
+ }
+
if (src->session) {
- soup_session_abort (src->session); /* This unrefs the message. */
+ soup_session_abort (src->session);
g_object_unref (src->session);
src->session = NULL;
- src->msg = NULL;
}
g_mutex_unlock (&src->mutex);
}
GstStructure *headers = user_data;
const GValue *gv;
+ if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL))
+ return;
+
gv = gst_structure_get_value (headers, name);
if (gv && GST_VALUE_HOLDS_ARRAY (gv)) {
GValue v = G_VALUE_INIT;
return;
if (src->automatic_redirect && SOUP_STATUS_IS_REDIRECTION (msg->status_code)) {
- src->redirection_uri = g_strdup (soup_message_headers_get_one
- (msg->response_headers, "Location"));
- src->redirection_permanent =
- (msg->status_code == SOUP_STATUS_MOVED_PERMANENTLY);
- GST_DEBUG_OBJECT (src, "%u redirect to \"%s\" (permanent %d)",
- msg->status_code, src->redirection_uri, src->redirection_permanent);
- return;
+ const gchar *location;
+
+ location = soup_message_headers_get_one (msg->response_headers, "Location");
+
+ if (location) {
+ if (!g_utf8_validate (location, -1, NULL)) {
+ GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, SEEK,
+ (_("Corrupted HTTP response.")),
+ ("Location header is not valid UTF-8"),
+ ("http-status-code", G_TYPE_UINT, msg->status_code,
+ "http-redirection-uri", G_TYPE_STRING,
+ GST_STR_NULL (src->redirection_uri), NULL));
+ src->ret = GST_FLOW_ERROR;
+ return;
+ }
+
+ src->redirection_uri = g_strdup (location);
+
+ src->redirection_permanent =
+ (msg->status_code == SOUP_STATUS_MOVED_PERMANENTLY);
+ GST_DEBUG_OBJECT (src, "%u redirect to \"%s\" (permanent %d)",
+ msg->status_code, src->redirection_uri, src->redirection_permanent);
+ return;
+ }
}
if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
if ((value =
soup_message_headers_get_one (msg->response_headers,
"icy-metaint")) != NULL) {
- gint icy_metaint = atoi (value);
+ gint icy_metaint;
- GST_DEBUG_OBJECT (src, "icy-metaint: %s (parsed: %d)", value, icy_metaint);
- if (icy_metaint > 0) {
- if (src->src_caps)
- gst_caps_unref (src->src_caps);
+ if (g_utf8_validate (value, -1, NULL)) {
+ icy_metaint = atoi (value);
+
+ GST_DEBUG_OBJECT (src, "icy-metaint: %s (parsed: %d)", value,
+ icy_metaint);
+ if (icy_metaint > 0) {
+ if (src->src_caps)
+ gst_caps_unref (src->src_caps);
- src->src_caps = gst_caps_new_simple ("application/x-icy",
- "metadata-interval", G_TYPE_INT, icy_metaint, NULL);
+ src->src_caps = gst_caps_new_simple ("application/x-icy",
+ "metadata-interval", G_TYPE_INT, icy_metaint, NULL);
- gst_base_src_set_caps (GST_BASE_SRC (src), src->src_caps);
+ gst_base_src_set_caps (GST_BASE_SRC (src), src->src_caps);
+ }
}
}
if ((value =
soup_message_headers_get_content_type (msg->response_headers,
¶ms)) != NULL) {
- GST_DEBUG_OBJECT (src, "Content-Type: %s", value);
- if (g_ascii_strcasecmp (value, "audio/L16") == 0) {
+ if (!g_utf8_validate (value, -1, NULL)) {
+ GST_WARNING_OBJECT (src, "Content-Type is invalid UTF-8");
+ } else if (g_ascii_strcasecmp (value, "audio/L16") == 0) {
gint channels = 2;
gint rate = 44100;
char *param;
- if (src->src_caps)
+ GST_DEBUG_OBJECT (src, "Content-Type: %s", value);
+
+ if (src->src_caps) {
gst_caps_unref (src->src_caps);
+ src->src_caps = NULL;
+ }
param = g_hash_table_lookup (params, "channels");
- if (param != NULL)
- channels = atol (param);
+ if (param != NULL) {
+ guint64 val = g_ascii_strtoull (param, NULL, 10);
+ if (val < 64)
+ channels = val;
+ else
+ channels = 0;
+ }
param = g_hash_table_lookup (params, "rate");
- if (param != NULL)
- rate = atol (param);
+ if (param != NULL) {
+ guint64 val = g_ascii_strtoull (param, NULL, 10);
+ if (val < G_MAXINT)
+ rate = val;
+ else
+ rate = 0;
+ }
- src->src_caps = gst_caps_new_simple ("audio/x-unaligned-raw",
- "format", G_TYPE_STRING, "S16BE",
- "layout", G_TYPE_STRING, "interleaved",
- "channels", G_TYPE_INT, channels, "rate", G_TYPE_INT, rate, NULL);
+ if (rate > 0 && channels > 0) {
+ src->src_caps = gst_caps_new_simple ("audio/x-unaligned-raw",
+ "format", G_TYPE_STRING, "S16BE",
+ "layout", G_TYPE_STRING, "interleaved",
+ "channels", G_TYPE_INT, channels, "rate", G_TYPE_INT, rate, NULL);
- gst_base_src_set_caps (GST_BASE_SRC (src), src->src_caps);
+ gst_base_src_set_caps (GST_BASE_SRC (src), src->src_caps);
+ }
} else {
+ GST_DEBUG_OBJECT (src, "Content-Type: %s", value);
+
/* Set the Content-Type field on the caps */
if (src->src_caps) {
src->src_caps = gst_caps_make_writable (src->src_caps);
if ((value =
soup_message_headers_get_one (msg->response_headers,
"icy-name")) != NULL) {
- g_free (src->iradio_name);
- src->iradio_name = gst_soup_http_src_unicodify (value);
- if (src->iradio_name) {
- gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_ORGANIZATION,
- src->iradio_name, NULL);
+ if (g_utf8_validate (value, -1, NULL)) {
+ g_free (src->iradio_name);
+ src->iradio_name = gst_soup_http_src_unicodify (value);
+ if (src->iradio_name) {
+ gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_ORGANIZATION,
+ src->iradio_name, NULL);
+ }
}
}
if ((value =
soup_message_headers_get_one (msg->response_headers,
"icy-genre")) != NULL) {
- g_free (src->iradio_genre);
- src->iradio_genre = gst_soup_http_src_unicodify (value);
- if (src->iradio_genre) {
- gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_GENRE,
- src->iradio_genre, NULL);
+ if (g_utf8_validate (value, -1, NULL)) {
+ g_free (src->iradio_genre);
+ src->iradio_genre = gst_soup_http_src_unicodify (value);
+ if (src->iradio_genre) {
+ gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_GENRE,
+ src->iradio_genre, NULL);
+ }
}
}
if ((value = soup_message_headers_get_one (msg->response_headers, "icy-url"))
!= NULL) {
- g_free (src->iradio_url);
- src->iradio_url = gst_soup_http_src_unicodify (value);
- if (src->iradio_url) {
- gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_LOCATION,
- src->iradio_url, NULL);
+ if (g_utf8_validate (value, -1, NULL)) {
+ g_free (src->iradio_url);
+ src->iradio_url = gst_soup_http_src_unicodify (value);
+ if (src->iradio_url) {
+ gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_LOCATION,
+ src->iradio_url, NULL);
+ }
}
}
if (!gst_tag_list_is_empty (tag_list)) {
if (src->ret == GST_FLOW_CUSTOM_ERROR &&
src->read_position && msg->status_code != SOUP_STATUS_PARTIAL_CONTENT) {
src->seekable = FALSE;
- GST_ELEMENT_ERROR (src, RESOURCE, SEEK,
+ GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, SEEK,
(_("Server does not support seeking.")),
("Server does not accept Range HTTP header, URL: %s, Redirect to: %s",
- src->location, GST_STR_NULL (src->redirection_uri)));
+ src->location, GST_STR_NULL (src->redirection_uri)),
+ ("http-status-code", G_TYPE_UINT, msg->status_code,
+ "http-redirection-uri", G_TYPE_STRING,
+ GST_STR_NULL (src->redirection_uri), NULL));
src->ret = GST_FLOW_ERROR;
}
}
#define SOUP_HTTP_SRC_ERROR(src,soup_msg,cat,code,error_message) \
- GST_ELEMENT_ERROR ((src), cat, code, ("%s", error_message), \
- ("%s (%d), URL: %s, Redirect to: %s", (soup_msg)->reason_phrase, \
- (soup_msg)->status_code, (src)->location, GST_STR_NULL ((src)->redirection_uri)));
+ do { \
+ GST_ELEMENT_ERROR_WITH_DETAILS ((src), cat, code, ("%s", error_message), \
+ ("%s (%d), URL: %s, Redirect to: %s", (soup_msg)->reason_phrase, \
+ (soup_msg)->status_code, (src)->location, GST_STR_NULL ((src)->redirection_uri)), \
+ ("http-status-code", G_TYPE_UINT, msg->status_code, \
+ "http-redirect-uri", G_TYPE_STRING, GST_STR_NULL ((src)->redirection_uri), NULL)); \
+ } while(0)
static void
gst_soup_http_src_parse_status (SoupMessage * msg, GstSoupHTTPSrc * src)
} else if (SOUP_STATUS_IS_CLIENT_ERROR (msg->status_code) ||
SOUP_STATUS_IS_REDIRECTION (msg->status_code) ||
SOUP_STATUS_IS_SERVER_ERROR (msg->status_code)) {
+ const gchar *reason_phrase;
+
+ reason_phrase = msg->reason_phrase;
+ if (reason_phrase && !g_utf8_validate (reason_phrase, -1, NULL)) {
+ GST_ERROR_OBJECT (src, "Invalid UTF-8 in reason");
+ reason_phrase = "(invalid)";
+ }
+
/* Report HTTP error. */
/* when content_size is unknown and we have just finished receiving
* error dialog according to libsoup documentation.
*/
if (msg->status_code == SOUP_STATUS_NOT_FOUND) {
- GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND,
- ("%s", msg->reason_phrase),
- ("%s (%d), URL: %s, Redirect to: %s", msg->reason_phrase,
+ GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, NOT_FOUND,
+ ("%s", reason_phrase),
+ ("%s (%d), URL: %s, Redirect to: %s", reason_phrase,
msg->status_code, src->location,
- GST_STR_NULL (src->redirection_uri)));
+ GST_STR_NULL (src->redirection_uri)),
+ ("http-status-code", G_TYPE_UINT, msg->status_code,
+ "http-redirect-uri", G_TYPE_STRING,
+ GST_STR_NULL (src->redirection_uri), NULL));
} else if (msg->status_code == SOUP_STATUS_UNAUTHORIZED
|| msg->status_code == SOUP_STATUS_PAYMENT_REQUIRED
|| msg->status_code == SOUP_STATUS_FORBIDDEN
|| msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
- GST_ELEMENT_ERROR (src, RESOURCE, NOT_AUTHORIZED, ("%s",
- msg->reason_phrase), ("%s (%d), URL: %s, Redirect to: %s",
- msg->reason_phrase, msg->status_code, src->location,
- GST_STR_NULL (src->redirection_uri)));
+ GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, NOT_AUTHORIZED, ("%s",
+ reason_phrase), ("%s (%d), URL: %s, Redirect to: %s",
+ reason_phrase, msg->status_code, src->location,
+ GST_STR_NULL (src->redirection_uri)), ("http-status-code",
+ G_TYPE_UINT, msg->status_code, "http-redirect-uri", G_TYPE_STRING,
+ GST_STR_NULL (src->redirection_uri), NULL));
} else {
- GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ,
- ("%s", msg->reason_phrase),
- ("%s (%d), URL: %s, Redirect to: %s", msg->reason_phrase,
+ GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, OPEN_READ,
+ ("%s", reason_phrase),
+ ("%s (%d), URL: %s, Redirect to: %s", reason_phrase,
msg->status_code, src->location,
- GST_STR_NULL (src->redirection_uri)));
+ GST_STR_NULL (src->redirection_uri)),
+ ("http-status-code", G_TYPE_UINT, msg->status_code,
+ "http-redirect-uri", G_TYPE_STRING,
+ GST_STR_NULL (src->redirection_uri), NULL));
}
src->ret = GST_FLOW_ERROR;
}
return TRUE;
}
-static void
-gst_soup_http_src_check_input_stream_interfaces (GstSoupHTTPSrc * src)
-{
- if (!src->input_stream)
- return;
-
- src->has_pollable_interface = G_IS_POLLABLE_INPUT_STREAM (src->input_stream)
- && g_pollable_input_stream_can_poll ((GPollableInputStream *)
- src->input_stream);
-}
-
static GstFlowReturn
gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
{
g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
- g_assert (src->input_stream == NULL);
- g_assert (src->poll_source == NULL);
-
/* FIXME We are ignoring the GError here, might be useful to debug */
src->input_stream =
soup_session_send (src->session, src->msg, src->cancellable, NULL);
return GST_FLOW_ERROR;
}
- gst_soup_http_src_check_input_stream_interfaces (src);
-
return GST_FLOW_OK;
}
return src->ret;
}
+/*
+ * Check if the bytes_read is above a certain threshold of the blocksize, if
+ * that happens a few times in a row, increase the blocksize; Do the same in
+ * the opposite direction to reduce the blocksize.
+ */
+static void
+gst_soup_http_src_check_update_blocksize (GstSoupHTTPSrc * src,
+ gint64 bytes_read)
+{
+ guint blocksize = gst_base_src_get_blocksize (GST_BASE_SRC_CAST (src));
+
+ GST_LOG_OBJECT (src, "Checking to update blocksize. Read:%" G_GINT64_FORMAT
+ " blocksize:%u", bytes_read, blocksize);
+
+ if (bytes_read >= blocksize * GROW_BLOCKSIZE_LIMIT) {
+ src->reduce_blocksize_count = 0;
+ src->increase_blocksize_count++;
+
+ if (src->increase_blocksize_count >= GROW_BLOCKSIZE_COUNT) {
+ blocksize *= GROW_BLOCKSIZE_FACTOR;
+ GST_DEBUG_OBJECT (src, "Increased blocksize to %u", blocksize);
+ gst_base_src_set_blocksize (GST_BASE_SRC_CAST (src), blocksize);
+ src->increase_blocksize_count = 0;
+ }
+ } else if (bytes_read < blocksize * REDUCE_BLOCKSIZE_LIMIT) {
+ src->reduce_blocksize_count++;
+ src->increase_blocksize_count = 0;
+
+ if (src->reduce_blocksize_count >= REDUCE_BLOCKSIZE_COUNT) {
+ blocksize *= REDUCE_BLOCKSIZE_FACTOR;
+ blocksize = MAX (blocksize, src->minimum_blocksize);
+ GST_DEBUG_OBJECT (src, "Decreased blocksize to %u", blocksize);
+ gst_base_src_set_blocksize (GST_BASE_SRC_CAST (src), blocksize);
+ src->reduce_blocksize_count = 0;
+ }
+ } else {
+ src->reduce_blocksize_count = src->increase_blocksize_count = 0;
+ }
+}
+
static void
gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
{
}
}
-static gboolean
-_gst_soup_http_src_data_available_callback (GObject * pollable_stream,
- gpointer udata)
-{
- GstSoupHTTPSrc *src = udata;
-
- src->have_data = TRUE;
- return TRUE;
-}
-
-/* Need to wait on a gsource to know when data is available */
-static gboolean
-gst_soup_http_src_wait_for_data (GstSoupHTTPSrc * src)
-{
- src->have_data = FALSE;
-
- if (!src->poll_source) {
- src->poll_source =
- g_pollable_input_stream_create_source ((GPollableInputStream *)
- src->input_stream, src->cancellable);
- g_source_set_callback (src->poll_source,
- (GSourceFunc) _gst_soup_http_src_data_available_callback, src, NULL);
- g_source_attach (src->poll_source, src->poll_context);
- }
-
- while (!src->have_data && !g_cancellable_is_cancelled (src->cancellable)) {
- g_main_context_iteration (src->poll_context, TRUE);
- }
-
- return src->have_data;
-}
-
static GstFlowReturn
gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
{
GstMapInfo mapinfo;
GstBaseSrc *bsrc;
GstFlowReturn ret;
- GError *err = NULL;
bsrc = GST_BASE_SRC_CAST (src);
return GST_FLOW_ERROR;
}
- if (src->has_pollable_interface) {
- while (1) {
- read_bytes =
- g_pollable_input_stream_read_nonblocking ((GPollableInputStream *)
- src->input_stream, mapinfo.data, mapinfo.size, src->cancellable,
- &err);
- if (read_bytes == -1) {
- if (err && g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_error_free (err);
- err = NULL;
-
- /* no data yet, wait */
- if (gst_soup_http_src_wait_for_data (src))
- /* retry */
- continue;
- }
- }
- break;
- }
- } else {
- read_bytes =
- g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
- src->cancellable, NULL);
- }
-
- if (err)
- g_error_free (err);
-
+ read_bytes =
+ g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
+ src->cancellable, NULL);
GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
read_bytes);
g_mutex_unlock (&src->mutex);
return GST_FLOW_FLUSHING;
}
- g_mutex_unlock (&src->mutex);
gst_buffer_unmap (*outbuf, &mapinfo);
if (read_bytes > 0) {
/* Got some data, reset retry counter */
src->retry_count = 0;
+
+ gst_soup_http_src_check_update_blocksize (src, read_bytes);
+
+ /* If we're at the end of a range request, read again to let libsoup
+ * finalize the request. This allows to reuse the connection again later,
+ * otherwise we would have to cancel the message and close the connection
+ */
+ if (bsrc->segment.stop != -1
+ && bsrc->segment.position + read_bytes >= bsrc->segment.stop) {
+ guint8 tmp[128];
+
+ g_object_unref (src->msg);
+ src->msg = NULL;
+ src->have_body = TRUE;
+
+ /* This should return immediately as we're at the end of the range */
+ read_bytes =
+ g_input_stream_read (src->input_stream, tmp, sizeof (tmp),
+ src->cancellable, NULL);
+ if (read_bytes > 0)
+ GST_ERROR_OBJECT (src,
+ "Read %" G_GSIZE_FORMAT " bytes after end of range", read_bytes);
+ }
} else {
gst_buffer_unref (*outbuf);
if (read_bytes < 0) {
/* Maybe the server disconnected, retry */
ret = GST_FLOW_CUSTOM_ERROR;
} else {
+ g_object_unref (src->msg);
+ src->msg = NULL;
ret = GST_FLOW_EOS;
src->have_body = TRUE;
}
}
+ g_mutex_unlock (&src->mutex);
+
return ret;
}
/* Check for pending position change */
if (src->request_position != src->read_position) {
- gst_soup_http_src_destroy_input_stream (src);
+ if (src->input_stream) {
+ g_input_stream_close (src->input_stream, src->cancellable, NULL);
+ g_object_unref (src->input_stream);
+ src->input_stream = NULL;
+ }
}
if (g_cancellable_is_cancelled (src->cancellable)) {
gst_event_unref (http_headers_event);
g_mutex_lock (&src->mutex);
- gst_soup_http_src_destroy_input_stream (src);
+ if (src->input_stream) {
+ g_object_unref (src->input_stream);
+ src->input_stream = NULL;
+ }
g_mutex_unlock (&src->mutex);
if (ret == GST_FLOW_CUSTOM_ERROR)
goto retry;