{
GstDashDemux *demux = GST_DASH_DEMUX (obj);
+ gst_dash_demux_reset (demux, TRUE);
+
if (demux->stream_task) {
- if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
- GST_DEBUG_OBJECT (demux, "Leaving streaming task");
- gst_task_stop (demux->stream_task);
- gst_task_join (demux->stream_task);
- }
gst_object_unref (demux->stream_task);
g_static_rec_mutex_free (&demux->stream_lock);
demux->stream_task = NULL;
}
if (demux->download_task) {
- if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
- GST_DEBUG_OBJECT (demux, "Leaving download task");
- gst_task_stop (demux->download_task);
- gst_task_join (demux->download_task);
- }
gst_object_unref (demux->download_task);
g_static_rec_mutex_free (&demux->download_lock);
demux->download_task = NULL;
demux->downloader = NULL;
}
- gst_dash_demux_reset (demux, TRUE);
-
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
GstDashDemux *demux = GST_DASH_DEMUX (element);
switch (transition) {
- case GST_STATE_CHANGE_READY_TO_PAUSED:
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_dash_demux_reset (demux, FALSE);
break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
/* Start the streaming loop in paused only if we already received
the manifest. It might have been stopped if we were in PAUSED
gst_dash_demux_pause_stream_task (demux);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
- demux->cancelled = TRUE;
- gst_dash_demux_stop (demux);
- gst_task_join (demux->stream_task);
- gst_task_join (demux->download_task);
break;
default:
break;
return ret;
}
-static void
-gst_dash_demux_clear_queues (GstDashDemux * demux)
-{
- GSList *iter;
-
- for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
- GstDashDemuxStream *stream = iter->data;
-
- gst_data_queue_flush (stream->queue);
- }
-}
-
static gboolean
_check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time,
GstDashDemuxStream * stream)
GstDashDemux *demux;
demux = GST_DASH_DEMUX (gst_pad_get_element_private (pad));
- GST_WARNING_OBJECT (demux, "Received an event");
switch (event->type) {
case GST_EVENT_SEEK:
/* Stop the demux */
demux->cancelled = TRUE;
+ /* Stop the demux, also clears the buffering queue */
gst_dash_demux_stop (demux);
/* Wait for streaming to finish */
g_static_rec_mutex_lock (&demux->stream_lock);
- /* Clear the buffering queue */
- /* FIXME: allow seeking in the buffering queue */
- gst_dash_demux_clear_queues (demux);
-
//GST_MPD_CLIENT_LOCK (demux->client);
/* select the requested Period in the Media Presentation */
stream->need_segment = TRUE;
gst_data_queue_set_flushing (stream->queue, FALSE);
}
+ gst_uri_downloader_reset (demux->downloader);
gst_dash_demux_resume_download_task (demux);
gst_dash_demux_resume_stream_task (demux);
g_static_rec_mutex_unlock (&demux->stream_lock);
static gboolean
gst_dash_demux_sink_event (GstPad * pad, GstEvent * event)
{
- GstDashDemux *demux = GST_DASH_DEMUX (gst_pad_get_parent (pad));
+ GstDashDemux *demux = GST_DASH_DEMUX (GST_PAD_PARENT (pad));
switch (event->type) {
+ case GST_EVENT_FLUSH_STOP:
+ gst_dash_demux_reset (demux, FALSE);
+ break;
case GST_EVENT_EOS:{
gchar *manifest;
GstQuery *query;
static GstFlowReturn
gst_dash_demux_pad (GstPad * pad, GstBuffer * buf)
{
- GstDashDemux *demux = GST_DASH_DEMUX (gst_pad_get_parent (pad));
+ GstDashDemux *demux = GST_DASH_DEMUX (GST_PAD_PARENT (pad));
if (demux->manifest == NULL)
demux->manifest = buf;
else
demux->manifest = gst_buffer_join (demux->manifest, buf);
- gst_object_unref (demux);
-
return GST_FLOW_OK;
}
{
GSList *iter;
- gst_uri_downloader_cancel (demux->downloader);
+ if (demux->downloader)
+ gst_uri_downloader_cancel (demux->downloader);
+
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->download_task);
gst_task_stop (demux->download_task);
+ g_static_rec_mutex_lock (&demux->download_lock);
+ g_static_rec_mutex_unlock (&demux->download_lock);
+ gst_task_join (demux->download_task);
}
if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
GST_TASK_SIGNAL (demux->stream_task);
gst_task_stop (demux->stream_task);
+ g_static_rec_mutex_lock (&demux->stream_lock);
+ g_static_rec_mutex_unlock (&demux->stream_lock);
+ gst_task_join (demux->stream_task);
+ }
+
+ for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+ GstDashDemuxStream *stream = iter->data;
+
+ gst_data_queue_flush (stream->queue);
}
}
eos = FALSE;
eop = FALSE;
- if (!gst_data_queue_peek (stream->queue, &item))
- continue;
+ if (!gst_data_queue_peek (stream->queue, &item)) {
+ /* flushing */
+ goto flushing;
+ }
if (G_LIKELY (GST_IS_BUFFER (item->object))) {
pad_switch = FALSE;
}
end:
+ GST_INFO_OBJECT (demux, "Leaving streaming task");
return;
+flushing:
+ {
+ GST_WARNING_OBJECT (demux, "Flushing, leaving streaming task");
+ gst_task_stop (demux->stream_task);
+ return;
+ }
+
end_of_manifest:
{
GST_INFO_OBJECT (demux, "Reached end of manifest, sending EOS");
GST_ERROR_OBJECT (demux,
"Error pushing buffer: %s... terminating the demux",
gst_flow_get_name (ret));
- gst_dash_demux_stop (demux);
+ gst_task_stop (demux->stream_task);
return;
}
}
static void
gst_dash_demux_stream_free (GstDashDemuxStream * stream)
{
- if (stream->input_caps)
+ if (stream->input_caps) {
gst_caps_unref (stream->input_caps);
- if (stream->pad)
+ stream->input_caps = NULL;
+ }
+ if (stream->pad) {
gst_object_unref (stream->pad);
-
- /* TODO flush the queue */
- g_object_unref (stream->queue);
+ stream->pad = NULL;
+ }
+ if (stream->queue) {
+ g_object_unref (stream->queue);
+ stream->queue = NULL;
+ }
g_free (stream);
}
demux->end_of_period = FALSE;
demux->end_of_manifest = FALSE;
- demux->cancelled = FALSE;
- gst_dash_demux_clear_queues (demux);
+ demux->cancelled = TRUE;
+ gst_dash_demux_stop (demux);
+ if (demux->downloader)
+ gst_uri_downloader_reset (demux->downloader);
for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
GstDashDemuxStream *stream = iter->data;
+ if (stream->pad)
+ gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
gst_dash_demux_stream_free (stream);
}
g_slist_free (demux->streams);
demux->last_manifest_update = GST_CLOCK_TIME_NONE;
demux->position = 0;
demux->position_shift = 0;
+ demux->cancelled = FALSE;
}
static GstClockTime
} else {
goto error_downloading;
}
+ } else if (demux->cancelled) {
+ goto cancelled;
} else {
goto quit;
}
demux->client->update_failed_count = 0;
quit:
+ return;
+
+cancelled:
{
+ GST_WARNING_OBJECT (demux, "Cancelled, leaving download task");
+ gst_task_stop (demux->download_task);
return;
}
end_of_manifest:
{
- GST_INFO_OBJECT (demux, "Stopped download task");
+ GST_INFO_OBJECT (demux, "End of manifest, leaving download task");
gst_task_stop (demux->download_task);
return;
}
error_downloading:
{
GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
- ("Could not fetch the next fragment"), (NULL));
- gst_dash_demux_stop (demux);
+ ("Could not fetch the next fragment, leaving download task"), (NULL));
+ gst_task_stop (demux->download_task);
return;
}
}
if (download == NULL)
return FALSE;
- buffer = gst_fragment_get_buffer (download);
-
active_stream =
gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
if (active_stream == NULL) /* TODO unref fragments */
return FALSE;
+ buffer = gst_fragment_get_buffer (download);
+
if (selected_stream->need_header) {
/* We need to fetch a new header */
if ((header =
gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
- GST_INFO_OBJECT (demux, "Unable to fetch header");
+ GST_WARNING_OBJECT (demux, "Unable to fetch header");
} else {
GstBuffer *header_buffer;
/* Replace fragment with a new one including the header */
* Boston, MA 02111-1307, USA.
*/
-#define GLIB_DISABLE_DEPRECATION_WARNINGS
-
#include <glib.h>
#include "gstfragmented.h"
#include "gstfragment.h"
GstFragment *download;
GMutex *lock;
GCond *cond;
+ gboolean cancelled;
};
static void gst_uri_downloader_finalize (GObject * object);
gst_pad_set_event_function (downloader->priv->pad,
GST_DEBUG_FUNCPTR (gst_uri_downloader_sink_event));
gst_pad_set_element_private (downloader->priv->pad, downloader);
- gst_pad_activate_push (downloader->priv->pad, TRUE);
+ gst_pad_set_active (downloader->priv->pad, TRUE);
/* Create a bus to handle error and warning message from the source element */
downloader->priv->bus = gst_bus_new ();
static gboolean
gst_uri_downloader_sink_event (GstPad * pad, GstEvent * event)
{
- GstUriDownloader *downloader =
- (GstUriDownloader *) (gst_pad_get_element_private (pad));
+ gboolean ret = FALSE;
+ GstUriDownloader *downloader;
+
+ downloader = GST_URI_DOWNLOADER (gst_pad_get_element_private (pad));
switch (event->type) {
case GST_EVENT_EOS:{
- GST_OBJECT_LOCK (downloader);
+ g_mutex_lock (downloader->priv->lock);
GST_DEBUG_OBJECT (downloader, "Got EOS on the fetcher pad");
if (downloader->priv->download != NULL) {
/* signal we have fetched the URI */
downloader->priv->download->completed = TRUE;
- downloader->priv->download->download_stop_time = g_get_real_time ();
- GST_OBJECT_UNLOCK (downloader);
+ downloader->priv->download->download_stop_time =
+ gst_util_get_timestamp ();
GST_DEBUG_OBJECT (downloader, "Signaling chain funtion");
g_cond_signal (downloader->priv->cond);
-
- } else {
- GST_OBJECT_UNLOCK (downloader);
}
+ g_mutex_unlock (downloader->priv->lock);
+ gst_event_unref (event);
break;
}
default:
+ ret = gst_pad_event_default (pad, event);
break;
}
- gst_event_unref (event);
- return FALSE;
+ return ret;
}
static GstBusSyncReply
if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR ||
GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
+ GError *err = NULL;
+ gchar *dbg_info = NULL;
+
+ gst_message_parse_error (message, &err, &dbg_info);
GST_WARNING_OBJECT (downloader,
- "Received error in bus from the source element, "
- "the download will be cancelled");
+ "Received error: %s from %s, the download will be cancelled",
+ GST_OBJECT_NAME (message->src), err->message);
+ GST_DEBUG ("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
+ g_error_free (err);
+ g_free (dbg_info);
+
/* remove the sync handler to avoid duplicated messages */
gst_bus_set_sync_handler (downloader->priv->bus, NULL, NULL);
gst_uri_downloader_cancel (downloader);
static GstFlowReturn
gst_uri_downloader_chain (GstPad * pad, GstBuffer * buf)
{
- GstUriDownloader *downloader =
- (GstUriDownloader *) gst_pad_get_element_private (pad);
+ GstUriDownloader *downloader;
+
+ downloader = GST_URI_DOWNLOADER (gst_pad_get_element_private (pad));
/* HTML errors (404, 500, etc...) are also pushed through this pad as
* response but the source element will also post a warning or error message
* in the bus, which is handled synchronously cancelling the download.
*/
- GST_OBJECT_LOCK (downloader);
+ g_mutex_lock (downloader->priv->lock);
if (downloader->priv->download == NULL) {
/* Download cancelled, quit */
- GST_OBJECT_UNLOCK (downloader);
goto done;
}
- GST_LOG_OBJECT (downloader,
- "The uri fetcher received a new buffer of size %u",
- GST_BUFFER_SIZE (buf));
+ GST_LOG_OBJECT (downloader, "The uri fetcher received a new buffer "
+ "of size %u", GST_BUFFER_SIZE (buf));
if (!gst_fragment_add_buffer (downloader->priv->download, buf))
GST_WARNING_OBJECT (downloader, "Could not add buffer to fragment");
- GST_OBJECT_UNLOCK (downloader);
done:
{
+ g_mutex_unlock (downloader->priv->lock);
return GST_FLOW_OK;
}
}
}
void
+gst_uri_downloader_reset (GstUriDownloader * downloader)
+{
+ g_return_if_fail (downloader != NULL);
+
+ g_mutex_lock (downloader->priv->lock);
+ downloader->priv->cancelled = FALSE;
+ g_mutex_unlock (downloader->priv->lock);
+}
+
+void
gst_uri_downloader_cancel (GstUriDownloader * downloader)
{
- GST_OBJECT_LOCK (downloader);
+ g_return_if_fail (downloader != NULL);
+
+ g_mutex_lock (downloader->priv->lock);
if (downloader->priv->download != NULL) {
GST_DEBUG_OBJECT (downloader, "Cancelling download");
g_object_unref (downloader->priv->download);
downloader->priv->download = NULL;
- GST_OBJECT_UNLOCK (downloader);
+ downloader->priv->cancelled = TRUE;
GST_DEBUG_OBJECT (downloader, "Signaling chain funtion");
g_cond_signal (downloader->priv->cond);
+ g_mutex_unlock (downloader->priv->lock);
} else {
- GST_OBJECT_UNLOCK (downloader);
- GST_DEBUG_OBJECT (downloader,
- "Trying to cancell a download that was alredy cancelled");
+ gboolean cancelled;
+
+ cancelled = downloader->priv->cancelled;
+ downloader->priv->cancelled = TRUE;
+ g_mutex_unlock (downloader->priv->lock);
+
+ if (cancelled)
+ GST_DEBUG_OBJECT (downloader,
+ "Trying to cancel a download that was already cancelled");
}
}
{
GstPad *pad;
+ g_return_val_if_fail (downloader != NULL, FALSE);
+
if (!gst_uri_is_valid (uri))
return FALSE;
- if (downloader->priv->urisrc == NULL) {
- GST_DEBUG_OBJECT (downloader, "Creating source element for the URI:%s",
- uri);
- downloader->priv->urisrc =
- gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
- if (!downloader->priv->urisrc)
- return FALSE;
- } else {
- GST_DEBUG_OBJECT (downloader,
- "Reusing existing source element for the URI:%s", uri);
- if (!gst_uri_handler_set_uri (GST_URI_HANDLER (downloader->priv->urisrc),
- uri))
- return FALSE;
- }
+ GST_DEBUG_OBJECT (downloader, "Creating source element for the URI:%s", uri);
+ downloader->priv->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
+ if (!downloader->priv->urisrc)
+ return FALSE;
/* add a sync handler for the bus messages to detect errors in the download */
gst_element_set_bus (GST_ELEMENT (downloader->priv->urisrc),
* - the download failed (Error message on the fetcher bus)
* - the download was canceled
*/
- GST_DEBUG_OBJECT (downloader, "Waiting to fetch the URI");
+ GST_DEBUG_OBJECT (downloader, "Waiting to fetch the URI %s", uri);
+ if (downloader->priv->cancelled) {
+ g_object_unref (downloader->priv->download);
+ downloader->priv->download = NULL;
+ goto quit;
+ }
g_cond_wait (downloader->priv->cond, downloader->priv->lock);
- GST_OBJECT_LOCK (downloader);
download = downloader->priv->download;
downloader->priv->download = NULL;
- GST_OBJECT_UNLOCK (downloader);
if (download != NULL)
- GST_INFO_OBJECT (downloader, "URI fetched successfully");
+ GST_INFO_OBJECT (downloader, "URI %s fetched successfully", uri);
else
- GST_INFO_OBJECT (downloader, "Error fetching URI");
+ GST_INFO_OBJECT (downloader, "Error fetching URI %s", uri);
quit:
{