dashdemux: Properly stop download and stream tasks where appropriate.
authorAndre Moreira Magalhaes (andrunko) <andre.magalhaes@collabora.co.uk>
Tue, 5 Feb 2013 00:58:32 +0000 (22:58 -0200)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Wed, 8 May 2013 21:14:39 +0000 (18:14 -0300)
ext/dash/gstdashdemux.c
ext/dash/gsturidownloader.c
ext/dash/gsturidownloader.h

index 24d4fb2..cf2a944 100644 (file)
@@ -276,23 +276,15 @@ gst_dash_demux_dispose (GObject * obj)
 {
   GstDashDemux *demux = GST_DASH_DEMUX (obj);
 
+  gst_dash_demux_reset (demux, TRUE);
+
   if (demux->stream_task) {
-    if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
-      GST_DEBUG_OBJECT (demux, "Leaving streaming task");
-      gst_task_stop (demux->stream_task);
-      gst_task_join (demux->stream_task);
-    }
     gst_object_unref (demux->stream_task);
     g_static_rec_mutex_free (&demux->stream_lock);
     demux->stream_task = NULL;
   }
 
   if (demux->download_task) {
-    if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
-      GST_DEBUG_OBJECT (demux, "Leaving download task");
-      gst_task_stop (demux->download_task);
-      gst_task_join (demux->download_task);
-    }
     gst_object_unref (demux->download_task);
     g_static_rec_mutex_free (&demux->download_lock);
     demux->download_task = NULL;
@@ -303,8 +295,6 @@ gst_dash_demux_dispose (GObject * obj)
     demux->downloader = NULL;
   }
 
-  gst_dash_demux_reset (demux, TRUE);
-
   G_OBJECT_CLASS (parent_class)->dispose (obj);
 }
 
@@ -438,9 +428,11 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
   GstDashDemux *demux = GST_DASH_DEMUX (element);
 
   switch (transition) {
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
+    case GST_STATE_CHANGE_PAUSED_TO_READY:
       gst_dash_demux_reset (demux, FALSE);
       break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       /* Start the streaming loop in paused only if we already received
          the manifest. It might have been stopped if we were in PAUSED
@@ -460,10 +452,6 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
       gst_dash_demux_pause_stream_task (demux);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      demux->cancelled = TRUE;
-      gst_dash_demux_stop (demux);
-      gst_task_join (demux->stream_task);
-      gst_task_join (demux->download_task);
       break;
     default:
       break;
@@ -471,18 +459,6 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
   return ret;
 }
 
-static void
-gst_dash_demux_clear_queues (GstDashDemux * demux)
-{
-  GSList *iter;
-
-  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
-    GstDashDemuxStream *stream = iter->data;
-
-    gst_data_queue_flush (stream->queue);
-  }
-}
-
 static gboolean
 _check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time,
     GstDashDemuxStream * stream)
@@ -531,7 +507,6 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
   GstDashDemux *demux;
 
   demux = GST_DASH_DEMUX (gst_pad_get_element_private (pad));
-  GST_WARNING_OBJECT (demux, "Received an event");
 
   switch (event->type) {
     case GST_EVENT_SEEK:
@@ -584,15 +559,12 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
 
         /* Stop the demux */
         demux->cancelled = TRUE;
+        /* Stop the demux, also clears the buffering queue */
         gst_dash_demux_stop (demux);
 
         /* Wait for streaming to finish */
         g_static_rec_mutex_lock (&demux->stream_lock);
 
-        /* Clear the buffering queue */
-        /* FIXME: allow seeking in the buffering queue */
-        gst_dash_demux_clear_queues (demux);
-
         //GST_MPD_CLIENT_LOCK (demux->client);
 
         /* select the requested Period in the Media Presentation */
@@ -672,6 +644,7 @@ gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
           stream->need_segment = TRUE;
           gst_data_queue_set_flushing (stream->queue, FALSE);
         }
+        gst_uri_downloader_reset (demux->downloader);
         gst_dash_demux_resume_download_task (demux);
         gst_dash_demux_resume_stream_task (demux);
         g_static_rec_mutex_unlock (&demux->stream_lock);
@@ -754,9 +727,12 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
 static gboolean
 gst_dash_demux_sink_event (GstPad * pad, GstEvent * event)
 {
-  GstDashDemux *demux = GST_DASH_DEMUX (gst_pad_get_parent (pad));
+  GstDashDemux *demux = GST_DASH_DEMUX (GST_PAD_PARENT (pad));
 
   switch (event->type) {
+    case GST_EVENT_FLUSH_STOP:
+      gst_dash_demux_reset (demux, FALSE);
+      break;
     case GST_EVENT_EOS:{
       gchar *manifest;
       GstQuery *query;
@@ -913,15 +889,13 @@ gst_dash_demux_src_query (GstPad * pad, GstQuery * query)
 static GstFlowReturn
 gst_dash_demux_pad (GstPad * pad, GstBuffer * buf)
 {
-  GstDashDemux *demux = GST_DASH_DEMUX (gst_pad_get_parent (pad));
+  GstDashDemux *demux = GST_DASH_DEMUX (GST_PAD_PARENT (pad));
 
   if (demux->manifest == NULL)
     demux->manifest = buf;
   else
     demux->manifest = gst_buffer_join (demux->manifest, buf);
 
-  gst_object_unref (demux);
-
   return GST_FLOW_OK;
 }
 
@@ -930,7 +904,9 @@ gst_dash_demux_stop (GstDashDemux * demux)
 {
   GSList *iter;
 
-  gst_uri_downloader_cancel (demux->downloader);
+  if (demux->downloader)
+    gst_uri_downloader_cancel (demux->downloader);
+
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
 
@@ -940,10 +916,22 @@ gst_dash_demux_stop (GstDashDemux * demux)
   if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
     GST_TASK_SIGNAL (demux->download_task);
     gst_task_stop (demux->download_task);
+    g_static_rec_mutex_lock (&demux->download_lock);
+    g_static_rec_mutex_unlock (&demux->download_lock);
+    gst_task_join (demux->download_task);
   }
   if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
     GST_TASK_SIGNAL (demux->stream_task);
     gst_task_stop (demux->stream_task);
+    g_static_rec_mutex_lock (&demux->stream_lock);
+    g_static_rec_mutex_unlock (&demux->stream_lock);
+    gst_task_join (demux->stream_task);
+  }
+
+  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+    GstDashDemuxStream *stream = iter->data;
+
+    gst_data_queue_flush (stream->queue);
   }
 }
 
@@ -1082,8 +1070,10 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
     eos = FALSE;
     eop = FALSE;
 
-    if (!gst_data_queue_peek (stream->queue, &item))
-      continue;
+    if (!gst_data_queue_peek (stream->queue, &item)) {
+      /* flushing */
+      goto flushing;
+    }
 
     if (G_LIKELY (GST_IS_BUFFER (item->object))) {
       pad_switch = FALSE;
@@ -1170,8 +1160,16 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
   }
 
 end:
+  GST_INFO_OBJECT (demux, "Leaving streaming task");
   return;
 
+flushing:
+  {
+    GST_WARNING_OBJECT (demux, "Flushing, leaving streaming task");
+    gst_task_stop (demux->stream_task);
+    return;
+  }
+
 end_of_manifest:
   {
     GST_INFO_OBJECT (demux, "Reached end of manifest, sending EOS");
@@ -1190,7 +1188,7 @@ error_pushing:
     GST_ERROR_OBJECT (demux,
         "Error pushing buffer: %s... terminating the demux",
         gst_flow_get_name (ret));
-    gst_dash_demux_stop (demux);
+    gst_task_stop (demux->stream_task);
     return;
   }
 }
@@ -1198,13 +1196,18 @@ error_pushing:
 static void
 gst_dash_demux_stream_free (GstDashDemuxStream * stream)
 {
-  if (stream->input_caps)
+  if (stream->input_caps) {
     gst_caps_unref (stream->input_caps);
-  if (stream->pad)
+    stream->input_caps = NULL;
+  }
+  if (stream->pad) {
     gst_object_unref (stream->pad);
-
-  /* TODO flush the queue */
-  g_object_unref (stream->queue);
+    stream->pad = NULL;
+  }
+  if (stream->queue) {
+    g_object_unref (stream->queue);
+    stream->queue = NULL;
+  }
 
   g_free (stream);
 }
@@ -1216,12 +1219,16 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
 
   demux->end_of_period = FALSE;
   demux->end_of_manifest = FALSE;
-  demux->cancelled = FALSE;
 
-  gst_dash_demux_clear_queues (demux);
+  demux->cancelled = TRUE;
+  gst_dash_demux_stop (demux);
+  if (demux->downloader)
+    gst_uri_downloader_reset (demux->downloader);
 
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
+    if (stream->pad)
+      gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
     gst_dash_demux_stream_free (stream);
   }
   g_slist_free (demux->streams);
@@ -1243,6 +1250,7 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
   demux->last_manifest_update = GST_CLOCK_TIME_NONE;
   demux->position = 0;
   demux->position_shift = 0;
+  demux->cancelled = FALSE;
 }
 
 static GstClockTime
@@ -1432,6 +1440,8 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
       } else {
         goto error_downloading;
       }
+    } else if (demux->cancelled) {
+      goto cancelled;
     } else {
       goto quit;
     }
@@ -1441,13 +1451,18 @@ gst_dash_demux_download_loop (GstDashDemux * demux)
   demux->client->update_failed_count = 0;
 
 quit:
+  return;
+
+cancelled:
   {
+    GST_WARNING_OBJECT (demux, "Cancelled, leaving download task");
+    gst_task_stop (demux->download_task);
     return;
   }
 
 end_of_manifest:
   {
-    GST_INFO_OBJECT (demux, "Stopped download task");
+    GST_INFO_OBJECT (demux, "End of manifest, leaving download task");
     gst_task_stop (demux->download_task);
     return;
   }
@@ -1455,8 +1470,8 @@ end_of_manifest:
 error_downloading:
   {
     GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
-        ("Could not fetch the next fragment"), (NULL));
-    gst_dash_demux_stop (demux);
+        ("Could not fetch the next fragment, leaving download task"), (NULL));
+    gst_task_stop (demux->download_task);
     return;
   }
 }
@@ -1775,18 +1790,18 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux)
       if (download == NULL)
         return FALSE;
 
-      buffer = gst_fragment_get_buffer (download);
-
       active_stream =
           gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
       if (active_stream == NULL)        /* TODO unref fragments */
         return FALSE;
 
+      buffer = gst_fragment_get_buffer (download);
+
       if (selected_stream->need_header) {
         /* We need to fetch a new header */
         if ((header =
                 gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
-          GST_INFO_OBJECT (demux, "Unable to fetch header");
+          GST_WARNING_OBJECT (demux, "Unable to fetch header");
         } else {
           GstBuffer *header_buffer;
           /* Replace fragment with a new one including the header */
index 4313f22..9b56cbf 100644 (file)
@@ -19,8 +19,6 @@
  * Boston, MA 02111-1307, USA.
  */
 
-#define GLIB_DISABLE_DEPRECATION_WARNINGS
-
 #include <glib.h>
 #include "gstfragmented.h"
 #include "gstfragment.h"
@@ -43,6 +41,7 @@ struct _GstUriDownloaderPrivate
   GstFragment *download;
   GMutex *lock;
   GCond *cond;
+  gboolean cancelled;
 };
 
 static void gst_uri_downloader_finalize (GObject * object);
@@ -93,7 +92,7 @@ gst_uri_downloader_init (GstUriDownloader * downloader)
   gst_pad_set_event_function (downloader->priv->pad,
       GST_DEBUG_FUNCPTR (gst_uri_downloader_sink_event));
   gst_pad_set_element_private (downloader->priv->pad, downloader);
-  gst_pad_activate_push (downloader->priv->pad, TRUE);
+  gst_pad_set_active (downloader->priv->pad, TRUE);
 
   /* Create a bus to handle error and warning message from the source element */
   downloader->priv->bus = gst_bus_new ();
@@ -150,32 +149,33 @@ gst_uri_downloader_new (void)
 static gboolean
 gst_uri_downloader_sink_event (GstPad * pad, GstEvent * event)
 {
-  GstUriDownloader *downloader =
-      (GstUriDownloader *) (gst_pad_get_element_private (pad));
+  gboolean ret = FALSE;
+  GstUriDownloader *downloader;
+
+  downloader = GST_URI_DOWNLOADER (gst_pad_get_element_private (pad));
 
   switch (event->type) {
     case GST_EVENT_EOS:{
-      GST_OBJECT_LOCK (downloader);
+      g_mutex_lock (downloader->priv->lock);
       GST_DEBUG_OBJECT (downloader, "Got EOS on the fetcher pad");
       if (downloader->priv->download != NULL) {
         /* signal we have fetched the URI */
         downloader->priv->download->completed = TRUE;
-        downloader->priv->download->download_stop_time = g_get_real_time ();
-        GST_OBJECT_UNLOCK (downloader);
+        downloader->priv->download->download_stop_time =
+            gst_util_get_timestamp ();
         GST_DEBUG_OBJECT (downloader, "Signaling chain funtion");
         g_cond_signal (downloader->priv->cond);
-
-      } else {
-        GST_OBJECT_UNLOCK (downloader);
       }
+      g_mutex_unlock (downloader->priv->lock);
+      gst_event_unref (event);
       break;
     }
     default:
+      ret = gst_pad_event_default (pad, event);
       break;
   }
 
-  gst_event_unref (event);
-  return FALSE;
+  return ret;
 }
 
 static GstBusSyncReply
@@ -186,9 +186,17 @@ gst_uri_downloader_bus_handler (GstBus * bus,
 
   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR ||
       GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
+    GError *err = NULL;
+    gchar *dbg_info = NULL;
+
+    gst_message_parse_error (message, &err, &dbg_info);
     GST_WARNING_OBJECT (downloader,
-        "Received error in bus from the source element, "
-        "the download will be cancelled");
+        "Received error: %s from %s, the download will be cancelled",
+        GST_OBJECT_NAME (message->src), err->message);
+    GST_DEBUG ("Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
+    g_error_free (err);
+    g_free (dbg_info);
+
     /* remove the sync handler to avoid duplicated messages */
     gst_bus_set_sync_handler (downloader->priv->bus, NULL, NULL);
     gst_uri_downloader_cancel (downloader);
@@ -201,29 +209,28 @@ gst_uri_downloader_bus_handler (GstBus * bus,
 static GstFlowReturn
 gst_uri_downloader_chain (GstPad * pad, GstBuffer * buf)
 {
-  GstUriDownloader *downloader =
-      (GstUriDownloader *) gst_pad_get_element_private (pad);
+  GstUriDownloader *downloader;
+
+  downloader = GST_URI_DOWNLOADER (gst_pad_get_element_private (pad));
 
   /* HTML errors (404, 500, etc...) are also pushed through this pad as
    * response but the source element will also post a warning or error message
    * in the bus, which is handled synchronously cancelling the download.
    */
-  GST_OBJECT_LOCK (downloader);
+  g_mutex_lock (downloader->priv->lock);
   if (downloader->priv->download == NULL) {
     /* Download cancelled, quit */
-    GST_OBJECT_UNLOCK (downloader);
     goto done;
   }
 
-  GST_LOG_OBJECT (downloader,
-      "The uri fetcher received a new buffer of size %u",
-      GST_BUFFER_SIZE (buf));
+  GST_LOG_OBJECT (downloader, "The uri fetcher received a new buffer "
+      "of size %u", GST_BUFFER_SIZE (buf));
   if (!gst_fragment_add_buffer (downloader->priv->download, buf))
     GST_WARNING_OBJECT (downloader, "Could not add buffer to fragment");
-  GST_OBJECT_UNLOCK (downloader);
 
 done:
   {
+    g_mutex_unlock (downloader->priv->lock);
     return GST_FLOW_OK;
   }
 }
@@ -250,20 +257,39 @@ gst_uri_downloader_stop (GstUriDownloader * downloader)
 }
 
 void
+gst_uri_downloader_reset (GstUriDownloader * downloader)
+{
+  g_return_if_fail (downloader != NULL);
+
+  g_mutex_lock (downloader->priv->lock);
+  downloader->priv->cancelled = FALSE;
+  g_mutex_unlock (downloader->priv->lock);
+}
+
+void
 gst_uri_downloader_cancel (GstUriDownloader * downloader)
 {
-  GST_OBJECT_LOCK (downloader);
+  g_return_if_fail (downloader != NULL);
+
+  g_mutex_lock (downloader->priv->lock);
   if (downloader->priv->download != NULL) {
     GST_DEBUG_OBJECT (downloader, "Cancelling download");
     g_object_unref (downloader->priv->download);
     downloader->priv->download = NULL;
-    GST_OBJECT_UNLOCK (downloader);
+    downloader->priv->cancelled = TRUE;
     GST_DEBUG_OBJECT (downloader, "Signaling chain funtion");
     g_cond_signal (downloader->priv->cond);
+    g_mutex_unlock (downloader->priv->lock);
   } else {
-    GST_OBJECT_UNLOCK (downloader);
-    GST_DEBUG_OBJECT (downloader,
-        "Trying to cancell a download that was alredy cancelled");
+    gboolean cancelled;
+
+    cancelled = downloader->priv->cancelled;
+    downloader->priv->cancelled = TRUE;
+    g_mutex_unlock (downloader->priv->lock);
+
+    if (cancelled)
+      GST_DEBUG_OBJECT (downloader,
+          "Trying to cancel a download that was already cancelled");
   }
 }
 
@@ -272,23 +298,15 @@ gst_uri_downloader_set_uri (GstUriDownloader * downloader, const gchar * uri)
 {
   GstPad *pad;
 
+  g_return_val_if_fail (downloader != NULL, FALSE);
+
   if (!gst_uri_is_valid (uri))
     return FALSE;
 
-  if (downloader->priv->urisrc == NULL) {
-    GST_DEBUG_OBJECT (downloader, "Creating source element for the URI:%s",
-        uri);
-    downloader->priv->urisrc =
-        gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
-    if (!downloader->priv->urisrc)
-      return FALSE;
-  } else {
-    GST_DEBUG_OBJECT (downloader,
-        "Reusing existing source element for the URI:%s", uri);
-    if (!gst_uri_handler_set_uri (GST_URI_HANDLER (downloader->priv->urisrc),
-            uri))
-      return FALSE;
-  }
+  GST_DEBUG_OBJECT (downloader, "Creating source element for the URI:%s", uri);
+  downloader->priv->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
+  if (!downloader->priv->urisrc)
+    return FALSE;
 
   /* add a sync handler for the bus messages to detect errors in the download */
   gst_element_set_bus (GST_ELEMENT (downloader->priv->urisrc),
@@ -330,18 +348,21 @@ gst_uri_downloader_fetch_uri (GstUriDownloader * downloader, const gchar * uri)
    *   - the download failed (Error message on the fetcher bus)
    *   - the download was canceled
    */
-  GST_DEBUG_OBJECT (downloader, "Waiting to fetch the URI");
+  GST_DEBUG_OBJECT (downloader, "Waiting to fetch the URI %s", uri);
+  if (downloader->priv->cancelled) {
+    g_object_unref (downloader->priv->download);
+    downloader->priv->download = NULL;
+    goto quit;
+  }
   g_cond_wait (downloader->priv->cond, downloader->priv->lock);
 
-  GST_OBJECT_LOCK (downloader);
   download = downloader->priv->download;
   downloader->priv->download = NULL;
-  GST_OBJECT_UNLOCK (downloader);
 
   if (download != NULL)
-    GST_INFO_OBJECT (downloader, "URI fetched successfully");
+    GST_INFO_OBJECT (downloader, "URI %s fetched successfully", uri);
   else
-    GST_INFO_OBJECT (downloader, "Error fetching URI");
+    GST_INFO_OBJECT (downloader, "Error fetching URI %s", uri);
 
 quit:
   {
index b311cf1..683f5e8 100644 (file)
@@ -57,8 +57,10 @@ GType gst_uri_downloader_get_type (void);
 
 GstUriDownloader * gst_uri_downloader_new (void);
 GstFragment * gst_uri_downloader_fetch_uri (GstUriDownloader * downloader, const gchar * uri);
+void gst_uri_downloader_reset (GstUriDownloader *downloader);
 void gst_uri_downloader_cancel (GstUriDownloader *downloader);
 
 G_END_DECLS
+
 #endif /* __GSTURIDOWNLOADER_H__ */