dashdemux: remove stream loop thread
authorThiago Santos <ts.santos@sisa.samsung.com>
Fri, 20 Dec 2013 13:05:22 +0000 (10:05 -0300)
committerThiago Santos <ts.santos@sisa.samsung.com>
Tue, 24 Dec 2013 20:07:52 +0000 (17:07 -0300)
Download and push from the same task, makes code a lot simpler
to maintain. Also pushing from separate threads avoids deadlocking
when gst_pad_push blocks due to downstream queues being full.

ext/dash/gstdashdemux.c
ext/dash/gstdashdemux.h

index 61723de..a1f3418 100644 (file)
 #include <string.h>
 #include <inttypes.h>
 #include <gst/base/gsttypefindhelper.h>
+#include "gst/gst-i18n-plugin.h"
 #include "gstdashdemux.h"
 #include "gstdash_debug.h"
 
@@ -184,14 +185,6 @@ enum
 #define GST_DASH_DEMUX_CLIENT_LOCK(d) g_mutex_lock (&d->client_lock)
 #define GST_DASH_DEMUX_CLIENT_UNLOCK(d) g_mutex_unlock (&d->client_lock)
 
-/* Custom internal event to signal end of period */
-#define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED)
-static GstEvent *
-gst_event_new_dash_eop (void)
-{
-  return gst_event_new_custom (GST_EVENT_DASH_EOP, NULL);
-}
-
 /* GObject */
 static void gst_dash_demux_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -212,17 +205,16 @@ static gboolean gst_dash_demux_src_event (GstPad * pad, GstObject * parent,
     GstEvent * event);
 static gboolean gst_dash_demux_src_query (GstPad * pad, GstObject * parent,
     GstQuery * query);
-static void gst_dash_demux_stream_loop (GstDashDemux * demux);
 static void gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream);
 static void gst_dash_demux_stop (GstDashDemux * demux);
-static void gst_dash_demux_resume_stream_task (GstDashDemux * demux);
+static void gst_dash_demux_wait_stop (GstDashDemux * demux);
 static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
 static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux);
-static gboolean
-gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
+static GstEvent
+    * gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
     stream);
-static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux,
-    GstDashDemuxStream * stream, GstClockTime * next_ts);
+static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream
+    * stream, GstClockTime * ts);
 static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
 static void gst_dash_demux_download_wait (GstDashDemuxStream * stream,
     GstClockTime time_diff);
@@ -232,11 +224,6 @@ static void gst_dash_demux_remove_streams (GstDashDemux * demux,
     GSList * streams);
 static void gst_dash_demux_stream_free (GstDashDemuxStream * stream);
 static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
-#ifndef GST_DISABLE_GST_DEBUG
-static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
-static GstClockTime gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream
-    * stream);
-#endif
 static GstCaps *gst_dash_demux_get_input_caps (GstDashDemux * demux,
     GstActiveStream * stream);
 static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux);
@@ -254,12 +241,6 @@ gst_dash_demux_dispose (GObject * obj)
 
   gst_dash_demux_reset (demux, TRUE);
 
-  if (demux->stream_task) {
-    gst_object_unref (demux->stream_task);
-    g_rec_mutex_clear (&demux->stream_task_lock);
-    demux->stream_task = NULL;
-  }
-
   if (demux->downloader != NULL) {
     g_object_unref (demux->downloader);
     demux->downloader = NULL;
@@ -341,12 +322,6 @@ gst_dash_demux_init (GstDashDemux * demux)
   demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE;
   demux->max_bitrate = DEFAULT_MAX_BITRATE;
 
-  /* Streaming task */
-  g_rec_mutex_init (&demux->stream_task_lock);
-  demux->stream_task =
-      gst_task_new ((GstTaskFunction) gst_dash_demux_stream_loop, demux, NULL);
-  gst_task_set_lock (demux->stream_task, &demux->stream_task_lock);
-
   g_mutex_init (&demux->client_lock);
 }
 
@@ -421,47 +396,6 @@ gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
   return ret;
 }
 
-static gboolean
-_check_queue_full (GstDataQueue * q, guint visible, guint bytes, guint64 time,
-    GstDashDemux * demux)
-{
-  return time >= demux->max_buffering_time;
-}
-
-static void
-_data_queue_item_destroy (GstDataQueueItem * item)
-{
-  gst_mini_object_unref (item->object);
-  g_free (item);
-}
-
-static void
-gst_dash_demux_stream_push_event (GstDashDemuxStream * stream, GstEvent * event)
-{
-  GstDataQueueItem *item = g_new0 (GstDataQueueItem, 1);
-
-  item->object = GST_MINI_OBJECT_CAST (event);
-  item->destroy = (GDestroyNotify) _data_queue_item_destroy;
-
-  gst_data_queue_push_force (stream->queue, item);
-}
-
-static void
-gst_dash_demux_stream_push_data (GstDashDemuxStream * stream,
-    GstBuffer * fragment)
-{
-  GstDataQueueItem *item = g_new (GstDataQueueItem, 1);
-
-  item->object = GST_MINI_OBJECT_CAST (fragment);
-  item->duration = GST_BUFFER_DURATION (fragment);
-  item->visible = TRUE;
-  item->size = gst_buffer_get_size (fragment);
-
-  item->destroy = (GDestroyNotify) _data_queue_item_destroy;
-
-  gst_data_queue_push (stream->queue, item);
-}
-
 static void
 gst_dash_demux_stream_seek (GstDashDemuxStream * stream,
     GstClockTime target_pos)
@@ -537,6 +471,10 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
           start, stop_type, stop, &update);
 
       if (update) {
+        GstEvent *seg_evt;
+
+        GST_DASH_DEMUX_CLIENT_LOCK (demux);
+
         if (flags & GST_SEEK_FLAG_FLUSH) {
           GST_DEBUG_OBJECT (demux, "sending flush start");
           for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
@@ -546,13 +484,9 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
           }
         }
 
-        /* Stop the demux */
-        demux->cancelled = TRUE;
-        /* Stop the demux, also clears the buffering queue */
         gst_dash_demux_stop (demux);
-
-        /* Wait for streaming to finish */
-        g_rec_mutex_lock (&demux->stream_task_lock);
+        GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
+        gst_dash_demux_wait_stop (demux);
 
         /* select the requested Period in the Media Presentation */
         target_pos = (GstClockTime) demux->segment.start;
@@ -594,9 +528,15 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
         }
 
         /* Update the current sequence on all streams */
+        seg_evt = gst_event_new_segment (&demux->segment);
+        gst_event_set_seqnum (seg_evt, gst_event_get_seqnum (event));
         for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
-          gst_dash_demux_stream_seek (iter->data, target_pos);
+          GstDashDemuxStream *stream = iter->data;
+          gst_dash_demux_stream_seek (stream, target_pos);
+
+          gst_event_replace (&stream->pending_segment, seg_evt);
         }
+        gst_event_unref (seg_evt);
 
         if (flags & GST_SEEK_FLAG_FLUSH) {
           GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
@@ -604,31 +544,26 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
             GstDashDemuxStream *stream;
 
             stream = iter->data;
-            stream->has_data_queued = FALSE;
             stream->need_header = TRUE;
-            stream->download_end_of_period = FALSE;
-            stream->stream_end_of_period = FALSE;
             stream->stream_eos = FALSE;
             gst_pad_push_event (stream->pad, gst_event_new_flush_stop (TRUE));
           }
         }
 
         /* Restart the demux */
+        GST_DASH_DEMUX_CLIENT_LOCK (demux);
         demux->cancelled = FALSE;
         demux->end_of_manifest = FALSE;
         for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
           GstDashDemuxStream *stream = iter->data;
-          gst_data_queue_set_flushing (stream->queue, FALSE);
           stream->last_ret = GST_FLOW_OK;
           gst_uri_downloader_reset (stream->downloader);
         }
         demux->timestamp_offset = 0;
-        demux->need_segment = TRUE;
         gst_uri_downloader_reset (demux->downloader);
         GST_DEBUG_OBJECT (demux, "Resuming tasks after seeking");
         gst_dash_demux_resume_download_task (demux);
-        gst_dash_demux_resume_stream_task (demux);
-        g_rec_mutex_unlock (&demux->stream_task_lock);
+        GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
       }
 
       return TRUE;
@@ -646,8 +581,8 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
             stream->restart_download = TRUE;
             stream->need_header = TRUE;
             GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
+            gst_task_start (stream->download_task);
           }
-          gst_task_start (stream->download_task);
           GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
           gst_event_unref (event);
           return TRUE;
@@ -695,12 +630,10 @@ gst_dash_demux_all_streams_eop (GstDashDemux * demux)
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
 
-    GST_LOG_OBJECT (stream->pad, "EOP: %d (linked: %d)",
-        stream->download_end_of_period,
+    GST_LOG_OBJECT (stream->pad, "EOP: %d (linked: %d)", stream->stream_eos,
         stream->last_ret != GST_FLOW_NOT_LINKED);
 
-    if (!stream->download_end_of_period
-        && stream->last_ret != GST_FLOW_NOT_LINKED)
+    if (!stream->stream_eos && stream->last_ret != GST_FLOW_NOT_LINKED)
       return FALSE;
   }
 
@@ -740,9 +673,6 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
     stream->demux = demux;
     stream->active_stream = active_stream;
     caps = gst_dash_demux_get_input_caps (demux, active_stream);
-    stream->queue =
-        gst_data_queue_new ((GstDataQueueCheckFullFunction) _check_queue_full,
-        NULL, NULL, demux);
 
     g_rec_mutex_init (&stream->download_task_lock);
     stream->download_task =
@@ -756,7 +686,6 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
     stream->index = i;
     stream->input_caps = caps;
     stream->need_header = TRUE;
-    stream->has_data_queued = FALSE;
     gst_download_rate_init (&stream->dnl_rate);
     gst_download_rate_set_max_length (&stream->dnl_rate,
         DOWNLOAD_RATE_HISTORY_MAX);
@@ -788,7 +717,7 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
     gst_pad_push_event (stream->pad, event);
     g_free (stream_id);
 
-    gst_dash_demux_stream_push_event (stream, gst_event_new_caps (caps));
+    gst_pad_push_event (stream->pad, gst_event_new_caps (caps));
   }
   streams = g_slist_reverse (streams);
 
@@ -821,6 +750,7 @@ gst_dash_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
 
+      GST_DASH_DEMUX_CLIENT_LOCK (demux);
       if (demux->client)
         gst_mpd_client_free (demux->client);
       demux->client = gst_mpd_client_new ();
@@ -933,9 +863,8 @@ gst_dash_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
         }
       }
       demux->timestamp_offset = -1;
-      demux->need_segment = TRUE;
       gst_dash_demux_resume_download_task (demux);
-      gst_dash_demux_resume_stream_task (demux);
+      GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
 
     seek_quit:
       gst_event_unref (event);
@@ -1049,43 +978,37 @@ gst_dash_demux_pad (GstPad * pad, GstObject * parent, GstBuffer * buf)
 }
 
 static void
-gst_dash_demux_stop (GstDashDemux * demux)
+gst_dash_demux_wait_stop (GstDashDemux * demux)
 {
   GSList *iter;
 
-  GST_DEBUG_OBJECT (demux, "Stopping demux");
-
-  if (demux->downloader)
-    gst_uri_downloader_cancel (demux->downloader);
-
+  GST_DEBUG_OBJECT (demux, "Waiting for threads to stop");
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
 
-    gst_data_queue_set_flushing (stream->queue, TRUE);
-    if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
-      stream->last_ret = GST_FLOW_FLUSHING;
-      stream->need_header = TRUE;
-      gst_task_stop (stream->download_task);
-      GST_TASK_SIGNAL (stream->download_task);
-      gst_uri_downloader_cancel (stream->downloader);
-
-      gst_task_join (stream->download_task);
-    }
+    gst_task_join (stream->download_task);
   }
+}
 
-  if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
-    GST_TASK_SIGNAL (demux->stream_task);
-    gst_task_stop (demux->stream_task);
-    g_rec_mutex_lock (&demux->stream_task_lock);
-    g_rec_mutex_unlock (&demux->stream_task_lock);
-    gst_task_join (demux->stream_task);
-  }
+static void
+gst_dash_demux_stop (GstDashDemux * demux)
+{
+  GSList *iter;
+
+  GST_DEBUG_OBJECT (demux, "Stopping demux");
+  demux->cancelled = TRUE;
+
+  if (demux->downloader)
+    gst_uri_downloader_cancel (demux->downloader);
 
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
 
-    gst_data_queue_flush (stream->queue);
-    stream->has_data_queued = FALSE;
+    stream->last_ret = GST_FLOW_FLUSHING;
+    stream->need_header = TRUE;
+    gst_task_stop (stream->download_task);
+    GST_TASK_SIGNAL (stream->download_task);
+    gst_uri_downloader_cancel (stream->downloader);
   }
 }
 
@@ -1114,8 +1037,9 @@ gst_dash_demux_expose_streams (GstDashDemux * demux)
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
 
-    GST_LOG_OBJECT (demux, "Exposing stream %d %" GST_PTR_FORMAT, stream->index,
-        stream->input_caps);
+    GST_LOG_OBJECT (stream->pad, "Exposing stream %d %" GST_PTR_FORMAT,
+        stream->index, stream->input_caps);
+
     gst_element_add_pad (GST_ELEMENT (demux), gst_object_ref (stream->pad));
   }
   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
@@ -1130,8 +1054,8 @@ gst_dash_demux_remove_streams (GstDashDemux * demux, GSList * streams)
   for (iter = streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;;
 
-    GST_LOG_OBJECT (demux, "Removing stream %d %" GST_PTR_FORMAT, stream->index,
-        stream->input_caps);
+    GST_LOG_OBJECT (stream->pad, "Removing stream %d %" GST_PTR_FORMAT,
+        stream->index, stream->input_caps);
     gst_pad_push_event (stream->pad, gst_event_ref (eos));
     gst_pad_set_active (stream->pad, FALSE);
     gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
@@ -1145,7 +1069,8 @@ static gboolean
 gst_dash_demux_advance_period (GstDashDemux * demux)
 {
   GSList *old_period = NULL;
-  GST_DASH_DEMUX_CLIENT_LOCK (demux);
+  GSList *iter;
+  GstEvent *seg_evt;
 
   GST_DEBUG_OBJECT (demux, "Advancing period from %p", demux->streams);
 
@@ -1167,256 +1092,66 @@ gst_dash_demux_advance_period (GstDashDemux * demux)
     return FALSE;
   }
 
-  gst_dash_demux_expose_streams (demux);
-  gst_dash_demux_remove_streams (demux, old_period);
-
   GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
-  return TRUE;
-}
-
-static GstFlowReturn
-gst_dash_demux_combine_flows (GstDashDemux * demux)
-{
-  gboolean all_notlinked = TRUE;
-  GSList *iter;
 
+  /* TODO protect with lock, using the client lock isn't useful
+   * because causes deadlocks with the event_src handler */
+  gst_dash_demux_expose_streams (demux);
+  seg_evt = gst_event_new_segment (&demux->segment);
   for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
     GstDashDemuxStream *stream = iter->data;
 
-    if (stream->last_ret != GST_FLOW_NOT_LINKED)
-      all_notlinked = FALSE;
-
-    if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
-        || stream->last_ret == GST_FLOW_FLUSHING)
-      return stream->last_ret;
+    gst_event_replace (&stream->pending_segment, seg_evt);
   }
-  if (all_notlinked)
-    return GST_FLOW_NOT_LINKED;
-  return GST_FLOW_OK;
-}
+  gst_event_unref (seg_evt);
+  gst_dash_demux_remove_streams (demux, old_period);
 
+  GST_DASH_DEMUX_CLIENT_LOCK (demux);
+  return TRUE;
+}
 
-/* gst_dash_demux_stream_loop:
- * 
- * Loop for the "stream' task that pushes fragments to the src pads.
- * 
- * Startup: 
- * The task is started as soon as we have received the manifest and
- * waits for the first fragment to be downloaded and pushed in the
- * queue. Once this fragment has been pushed, the task pauses itself
- * until actual playback begins.
- * 
- * During playback:  
- * The task pushes fragments downstream at regular intervals based on
- * the fragment duration. If it detects a queue underrun, it sends
- * a buffering event to tell the main application to pause.
- * 
- * Teardown:
- * The task is stopped when we have reached the end of the manifest
- * and emptied our queue.
- * 
- */
-static void
-gst_dash_demux_stream_loop (GstDashDemux * demux)
+static GstFlowReturn
+gst_dash_demux_push (GstDashDemuxStream * stream, GstBuffer * buffer)
 {
-  GstFlowReturn ret;
-  GSList *iter;
-  GstClockTime best_time;
-  GstDashDemuxStream *selected_stream;
-  gboolean eos = TRUE;
-  gboolean eop = TRUE;
-
-  GST_LOG_OBJECT (demux, "Starting stream loop");
-
-  best_time = GST_CLOCK_TIME_NONE;
-  selected_stream = NULL;
-
-  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
-    GstDashDemuxStream *stream = iter->data;
-    GstDataQueueItem *item;
-
-    GST_DEBUG_OBJECT (demux, "Peeking stream %d", stream->index);
-
-    if (stream->last_ret == GST_FLOW_NOT_LINKED)
-      continue;
-
-    if (stream->stream_eos) {
-      GST_DEBUG_OBJECT (demux, "Stream %d is eos, skipping", stream->index);
-      continue;
-    }
-
-    if (stream->stream_end_of_period) {
-      GST_DEBUG_OBJECT (demux, "Stream %d is eop, skipping", stream->index);
-      eos = FALSE;
-      continue;
-    }
-    eos = FALSE;
-    eop = FALSE;
-
-    GST_DEBUG_OBJECT (demux, "peeking at the queue for stream %d",
-        stream->index);
-    if (!gst_data_queue_peek (stream->queue, &item)) {
-      /* flushing */
-      goto flushing;
-    }
-
-    if (G_LIKELY (GST_IS_BUFFER (item->object))) {
-      GstBuffer *buffer;
-      GstClockTime timestamp;
-
-      buffer = GST_BUFFER_CAST (item->object);
-      timestamp = GST_BUFFER_TIMESTAMP (buffer);
-
-      GST_LOG_OBJECT (demux, "Buffer with time %" GST_TIME_FORMAT,
-          GST_TIME_ARGS (timestamp));
-
-      if (timestamp < best_time) {
-        GST_DEBUG_OBJECT (demux, "Found new best time: %" GST_TIME_FORMAT " %p",
-            GST_TIME_ARGS (timestamp), buffer);
-        best_time = timestamp;
-        selected_stream = stream;
-      } else if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
-        selected_stream = stream;
-        GST_DEBUG_OBJECT (demux, "Buffer without timestamp selected %p",
-            buffer);
-        break;
-      }
-    } else {
-      GST_DEBUG_OBJECT (demux, "Non buffers have preference %" GST_PTR_FORMAT,
-          item->object);
-      selected_stream = stream;
-      break;
-    }
-  }
-
-  ret = GST_FLOW_OK;
-  if (selected_stream) {
-    GstDataQueueItem *item;
-
-    GST_DEBUG_OBJECT (demux, "Selected stream %p %d", selected_stream,
-        selected_stream->index);
-
-    if (!gst_data_queue_pop (selected_stream->queue, &item))
-      goto end;
-
-    if (G_LIKELY (GST_IS_BUFFER (item->object))) {
-      GstBuffer *buffer;
-      GstClockTime timestamp, duration;
-
-      buffer = GST_BUFFER_CAST (item->object);
-      timestamp = GST_BUFFER_TIMESTAMP (buffer);
-
-      if (demux->need_segment) {
-        if (demux->timestamp_offset == -1)
-          demux->timestamp_offset = timestamp;
+  GstDashDemux *demux = stream->demux;
+  GstClockTime timestamp, duration;
+  GstFlowReturn ret = GST_FLOW_OK;
 
-        /* And send a newsegment */
-        for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
-          GstDashDemuxStream *stream = iter->data;
-          gst_pad_push_event (stream->pad,
-              gst_event_new_segment (&demux->segment));
-        }
-        demux->need_segment = FALSE;
-      }
-      /* make timestamp start from 0 by subtracting the offset */
-      timestamp -= demux->timestamp_offset;
-      duration = GST_BUFFER_DURATION (buffer);
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
 
-      GST_BUFFER_TIMESTAMP (buffer) = timestamp;
+  if (stream->pending_segment) {
+    if (demux->timestamp_offset == -1)
+      demux->timestamp_offset = timestamp;
+    else
+      demux->timestamp_offset = MIN (timestamp, demux->timestamp_offset);
 
-      GST_DEBUG_OBJECT (demux,
-          "Pushing fragment ts: %" GST_TIME_FORMAT " at pad %s",
-          GST_TIME_ARGS (timestamp), GST_PAD_NAME (selected_stream->pad));
-      GST_DEBUG_OBJECT (demux,
-          "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%"
-          GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT " at pad: %s:%s", buffer,
-          GST_BUFFER_OFFSET (buffer), selected_stream->index,
-          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
-          GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
-          GST_DEBUG_PAD_NAME (selected_stream->pad));
-      ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer));
-      GST_DEBUG_OBJECT (demux, "Push result: %d %s", ret,
-          gst_flow_get_name (ret));
-
-      demux->segment.position = timestamp;
-      selected_stream->position = timestamp;
-      if (GST_CLOCK_TIME_IS_VALID (duration))
-        selected_stream->position += duration;
-
-      item->destroy (item);
-    } else {
-      /* a GstEvent */
-      if (GST_EVENT_TYPE (item->object) == GST_EVENT_EOS) {
-        selected_stream->stream_end_of_period = TRUE;
-        selected_stream->stream_eos = TRUE;
-        ret = GST_FLOW_EOS;
-      } else if (GST_EVENT_TYPE (item->object) == GST_EVENT_DASH_EOP) {
-        selected_stream->stream_end_of_period = TRUE;
-      }
-
-      if (GST_EVENT_TYPE (item->object) != GST_EVENT_DASH_EOP) {
-        gst_pad_push_event (selected_stream->pad,
-            gst_event_ref (GST_EVENT_CAST (item->object)));
-      }
-      item->destroy (item);
-    }
-  } else {
-    if (eos) {
-      goto end_of_manifest;
-    } else if (eop) {
-      gst_dash_demux_advance_period (demux);
-      demux->need_segment = TRUE;
-    }
+    /* And send a newsegment */
+    gst_pad_push_event (stream->pad, stream->pending_segment);
+    stream->pending_segment = NULL;
   }
 
-  if (selected_stream) {
-    GST_DASH_DEMUX_CLIENT_LOCK (demux);
-    if (ret != selected_stream->last_ret) {
-      gst_task_start (selected_stream->download_task);
-      selected_stream->last_ret = ret;
-    }
-    switch (selected_stream->last_ret) {
-      case GST_FLOW_NOT_LINKED:
-        gst_data_queue_set_flushing (selected_stream->queue, TRUE);
-        break;
-      default:
-        break;
-    }
-
-    /* combine flow returns */
-    ret = gst_dash_demux_combine_flows (demux);
-    if (ret < 0) {
-      goto error;
-    }
-    GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
-  }
+  /* make timestamp start from 0 by subtracting the offset */
+  timestamp -= demux->timestamp_offset;
+  duration = GST_BUFFER_DURATION (buffer);
 
-end:
-  GST_INFO_OBJECT (demux, "Leaving streaming task");
-  return;
+  GST_BUFFER_TIMESTAMP (buffer) = timestamp;
 
-flushing:
-  {
-    GST_WARNING_OBJECT (demux, "Flushing, leaving streaming task");
-    gst_task_stop (demux->stream_task);
-    return;
-  }
+  GST_DEBUG_OBJECT (stream->pad,
+      "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%"
+      GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT, buffer,
+      GST_BUFFER_OFFSET (buffer), stream->index,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+  ret = gst_pad_push (stream->pad, gst_buffer_ref (buffer));
+  GST_DEBUG_OBJECT (stream->pad, "Push result: %d %s", ret,
+      gst_flow_get_name (ret));
 
-end_of_manifest:
-  {
-    GST_INFO_OBJECT (demux, "Reached end of manifest, stopping streaming task");
-    gst_task_stop (demux->stream_task);
-    return;
-  }
+  demux->segment.position = timestamp;
+  stream->position = timestamp;
+  if (GST_CLOCK_TIME_IS_VALID (duration))
+    stream->position += duration;
 
-error:
-  {
-    GST_ERROR_OBJECT (demux,
-        "Error pushing buffer: %s... terminating the demux",
-        gst_flow_get_name (ret));
-    gst_task_stop (demux->stream_task);
-    return;
-  }
+  return ret;
 }
 
 static void
@@ -1427,17 +1162,15 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream)
     gst_caps_unref (stream->input_caps);
     stream->input_caps = NULL;
   }
+  if (stream->pending_segment) {
+    gst_event_unref (stream->pending_segment);
+  }
   if (stream->pad) {
     gst_object_unref (stream->pad);
     stream->pad = NULL;
   }
-  if (stream->queue) {
-    g_object_unref (stream->queue);
-    stream->queue = NULL;
-  }
   if (stream->download_task) {
     gst_task_stop (stream->download_task);
-    gst_task_join (stream->download_task);
     gst_object_unref (stream->download_task);
     g_rec_mutex_clear (&stream->download_task_lock);
   }
@@ -1461,8 +1194,8 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
   demux->end_of_period = FALSE;
   demux->end_of_manifest = FALSE;
 
-  demux->cancelled = TRUE;
   gst_dash_demux_stop (demux);
+  gst_dash_demux_wait_stop (demux);
   if (demux->downloader)
     gst_uri_downloader_reset (demux->downloader);
 
@@ -1508,36 +1241,6 @@ gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
   demux->cancelled = FALSE;
 }
 
-#ifndef GST_DISABLE_GST_DEBUG
-static GstClockTime
-gst_dash_demux_get_buffering_time (GstDashDemux * demux)
-{
-  GstClockTime buffer_time = GST_CLOCK_TIME_NONE;
-  GSList *iter;
-
-  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
-    GstClockTime btime = gst_dash_demux_stream_get_buffering_time (iter->data);
-
-    if (!GST_CLOCK_TIME_IS_VALID (buffer_time) || buffer_time > btime)
-      buffer_time = btime;
-  }
-
-  if (!GST_CLOCK_TIME_IS_VALID (buffer_time))
-    buffer_time = 0;
-  return buffer_time;
-}
-
-static GstClockTime
-gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream)
-{
-  GstDataQueueSize level;
-
-  gst_data_queue_get_level (stream->queue, &level);
-
-  return (GstClockTime) level.time;
-}
-#endif
-
 static GstFlowReturn
 gst_dash_demux_refresh_mpd (GstDashDemux * demux)
 {
@@ -1687,23 +1390,37 @@ gst_dash_demux_refresh_mpd (GstDashDemux * demux)
   return GST_FLOW_OK;
 }
 
+static GstFlowReturn
+gst_dash_demux_combine_flows (GstDashDemux * demux)
+{
+  gboolean all_notlinked = TRUE;
+  GSList *iter;
+
+  for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+    GstDashDemuxStream *stream = iter->data;
+
+    if (stream->last_ret != GST_FLOW_NOT_LINKED)
+      all_notlinked = FALSE;
+
+    if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
+        || stream->last_ret == GST_FLOW_FLUSHING)
+      return stream->last_ret;
+  }
+  if (all_notlinked)
+    return GST_FLOW_NOT_LINKED;
+  return GST_FLOW_OK;
+}
+
+
 /* gst_dash_demux_stream_download_loop:
  * 
  * Loop for the "download' task that fetches fragments based on the 
  * selected representations.
  * 
- * Startup: 
- * 
- * The task is started from the stream loop.
- * 
  * During playback:  
  * 
  * It sequentially fetches fragments corresponding to the current 
- * representations and pushes them into a queue.
- * 
- * It tries to maintain the number of queued items within a predefined 
- * range: if the queue is full, it will pause, checking every 100 ms if 
- * it needs to restart downloading fragments.
+ * representations and pushes them downstream
  * 
  * When a new set of fragments has been downloaded, it evaluates the
  * download time to check if we can or should switch to a different 
@@ -1713,36 +1430,22 @@ gst_dash_demux_refresh_mpd (GstDashDemux * demux)
  * 
  * The task will exit when it encounters an error or when the end of the
  * manifest has been reached.
- * 
  */
 static void
 gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
 {
-  GstClockTime fragment_ts = GST_CLOCK_TIME_NONE;
   GstDashDemux *demux = stream->demux;
   GstFlowReturn flow_ret = GST_FLOW_OK;
+  GstClockTime fragment_ts = GST_CLOCK_TIME_NONE;
+  GstEvent *caps_event;
 
   GST_LOG_OBJECT (stream->pad, "Starting download loop");
 
   GST_DASH_DEMUX_CLIENT_LOCK (demux);
-  if (stream->last_ret < GST_FLOW_OK) {
-    if (demux->cancelled) {
-      GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
-      goto cancelled;
-    }
-    GST_DEBUG_OBJECT (stream->pad, "Download loop waiting due to flow return: "
-        "%d %s", stream->last_ret, gst_flow_get_name (stream->last_ret));
-    gst_task_pause (stream->download_task);
-    GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
-    return;
-  }
-  GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
-
   if (demux->cancelled) {
     goto cancelled;
   }
 
-  GST_DASH_DEMUX_CLIENT_LOCK (demux);
   if (gst_mpd_client_is_live (demux->client)
       && demux->client->mpd_uri != NULL) {
     switch (gst_dash_demux_refresh_mpd (demux)) {
@@ -1754,43 +1457,69 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
     }
   }
 
-  GST_DEBUG_OBJECT (stream->pad, "End of manifest: %d", demux->end_of_manifest);
-
   /* try to switch to another set of representations if needed */
-  gst_dash_demux_stream_select_representation_unlocked (stream);
+  caps_event = gst_dash_demux_stream_select_representation_unlocked (stream);
   GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
 
+  if (G_UNLIKELY (caps_event && !gst_pad_push_event (stream->pad, caps_event))) {
+    /* TODO fail if caps is rejected */
+  }
+
   /* fetch the next fragment */
-  flow_ret = gst_dash_demux_get_next_fragment (demux, stream, &fragment_ts);
+  flow_ret = gst_dash_demux_stream_get_next_fragment (stream, &fragment_ts);
 
+  GST_DASH_DEMUX_CLIENT_LOCK (demux);
+  if (demux->cancelled) {
+    goto cancelled;
+  }
+
+  stream->last_ret = flow_ret;
   switch (flow_ret) {
     case GST_FLOW_OK:
       break;
+    case GST_FLOW_NOT_LINKED:
+      gst_task_pause (stream->download_task);
+      if (gst_dash_demux_combine_flows (demux) == GST_FLOW_NOT_LINKED) {
+        GST_ELEMENT_ERROR (demux, STREAM, FAILED,
+            (_("Internal data stream error.")),
+            ("stream stopped, reason %s",
+                gst_flow_get_name (GST_FLOW_NOT_LINKED)));
+      }
+      break;
+
+    case GST_FLOW_FLUSHING:
+      gst_dash_demux_stop (demux);
+      break;
+
     case GST_FLOW_EOS:
-      GST_DASH_DEMUX_CLIENT_LOCK (demux);
+      GST_DEBUG_OBJECT (stream->pad, "EOS");
+      stream->stream_eos = TRUE;
       if (gst_dash_demux_all_streams_eop (demux)) {
         GST_INFO_OBJECT (stream->pad, "Reached the end of the Period");
 
         if (gst_mpd_client_has_next_period (demux->client)) {
+
+          GST_INFO_OBJECT (demux, "Starting next period");
           /* setup video, audio and subtitle streams, starting from the next Period */
           if (!gst_mpd_client_set_period_index (demux->client,
                   gst_mpd_client_get_period_index (demux->client) + 1)
               || !gst_dash_demux_setup_all_streams (demux)) {
           }
+
+          stream->last_ret = GST_FLOW_OK;
           /* start playing from the first segment of the new period */
           gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
-          demux->end_of_period = FALSE;
-          gst_task_start (demux->stream_task);
-        } else if (!demux->end_of_manifest) {
-          GST_INFO_OBJECT (stream->pad, "Reached the end of the manifest file");
-          demux->end_of_manifest = TRUE;
-          gst_task_start (demux->stream_task);
+
+          gst_dash_demux_advance_period (demux);
+
+          gst_dash_demux_resume_download_task (demux);
           GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
-          goto end_of_manifest;
+
+          /* This pad is now finished, we lost its reference */
+          return;
         }
       }
       gst_task_pause (stream->download_task);
-      GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
       break;
     case GST_FLOW_ERROR:
       /* Download failed 'by itself'
@@ -1847,21 +1576,23 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
       break;
   }
 
-  if (demux->cancelled) {
-    goto cancelled;
-  }
-
-  GST_INFO_OBJECT (stream->pad, "Internal buffering : %" G_GUINT64_FORMAT " s",
-      gst_dash_demux_get_buffering_time (demux) / GST_SECOND);
-  demux->client->update_failed_count = 0;
+  if (G_LIKELY (stream->last_ret != GST_FLOW_ERROR))
+    demux->client->update_failed_count = 0;
 
 quit:
+  GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
+
+  if (G_UNLIKELY (stream->last_ret == GST_FLOW_EOS)) {
+    gst_pad_push_event (stream->pad, gst_event_new_eos ());
+  }
+
   GST_DEBUG_OBJECT (stream->pad, "Finishing download loop");
   return;
 
 cancelled:
   {
     GST_WARNING_OBJECT (stream->pad, "Cancelled, leaving download task");
+    GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
     return;
   }
 
@@ -1869,6 +1600,8 @@ end_of_manifest:
   {
     GST_INFO_OBJECT (stream->pad, "End of manifest, leaving download task");
     gst_task_stop (stream->download_task);
+    /* TODO should push eos? */
+    GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
     return;
   }
 
@@ -1877,17 +1610,12 @@ error_downloading:
     GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
         ("Could not fetch the next fragment, leaving download task"), (NULL));
     gst_task_stop (stream->download_task);
+    GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
     return;
   }
 }
 
 static void
-gst_dash_demux_resume_stream_task (GstDashDemux * demux)
-{
-  gst_task_start (demux->stream_task);
-}
-
-static void
 gst_dash_demux_resume_download_task (GstDashDemux * demux)
 {
   GSList *iter;
@@ -1904,7 +1632,7 @@ gst_dash_demux_resume_download_task (GstDashDemux * demux)
  * Select the most appropriate media representation based on current target 
  * bitrate.
  */
-static gboolean
+static GstEvent *
 gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
     stream)
 {
@@ -1943,20 +1671,17 @@ gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream *
         stream->index, new_index, rep->bandwidth);
     if (gst_mpd_client_setup_representation (demux->client, active_stream, rep)) {
       stream->need_header = TRUE;
-      stream->has_data_queued = FALSE;
       GST_INFO_OBJECT (demux, "Switching bitrate to %d",
           active_stream->cur_representation->bandwidth);
       gst_caps_unref (stream->input_caps);
       stream->input_caps = gst_dash_demux_get_input_caps (demux, active_stream);
-      gst_dash_demux_stream_push_event (stream,
-          gst_event_new_caps (stream->input_caps));
-      return TRUE;
+      return gst_event_new_caps (gst_caps_ref (stream->input_caps));
     } else {
       GST_WARNING_OBJECT (demux, "Can not switch representation, aborting...");
-      return FALSE;
+      return NULL;
     }
   }
-  return FALSE;
+  return NULL;
 }
 
 static GstBuffer *
@@ -2151,8 +1876,8 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux,
   }
 }
 
-static gboolean
-gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
+static GstBuffer *
+gst_dash_demux_stream_download_fragment (GstDashDemux * demux,
     GstDashDemuxStream * stream, guint64 * size_buffer,
     GstClockTime * download_time)
 {
@@ -2161,7 +1886,7 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
   GTimeVal now;
   GTimeVal start;
   guint stream_idx = stream->index;
-  GstBuffer *buffer;
+  GstBuffer *buffer = NULL;
   GstBuffer *header_buffer;
   GstMediaFragmentInfo fragment;
 
@@ -2215,17 +1940,12 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
      */
     stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
 
-    gst_data_queue_set_flushing (stream->queue, FALSE);
     stream->restart_download = FALSE;
-    gst_task_start (demux->stream_task);
   }
 
   if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) {
-    gboolean catch_up = FALSE;
-
     g_get_current_time (&start);
-    GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
-    GST_INFO_OBJECT (demux,
+    GST_INFO_OBJECT (stream->pad,
         "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%"
         GST_TIME_FORMAT " Range:%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
         fragment.uri, GST_TIME_ARGS (fragment.timestamp),
@@ -2237,14 +1957,14 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
 
     if (download == NULL) {
       gst_media_fragment_info_clear (&fragment);
-      return FALSE;
+      return NULL;
     }
 
     active_stream = stream->active_stream;
     if (active_stream == NULL) {
       gst_media_fragment_info_clear (&fragment);
       g_object_unref (download);
-      return FALSE;
+      return NULL;
     }
 
     buffer = gst_fragment_get_buffer (download);
@@ -2259,7 +1979,7 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
       if (!uri)                 /* fallback to default media uri */
         uri = fragment.uri;
 
-      GST_DEBUG_OBJECT (demux,
+      GST_DEBUG_OBJECT (stream->pad,
           "Fragment index download: %s %" G_GINT64_FORMAT "-%"
           G_GINT64_FORMAT, uri, fragment.index_range_start,
           fragment.index_range_end);
@@ -2293,41 +2013,15 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
         gst_mpd_client_get_segment_index (active_stream) - 1;
 
     gst_media_fragment_info_clear (&fragment);
-
-    /* Check if this stream is on catch up mode */
-    if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
-      GST_DEBUG_OBJECT (stream->pad,
-          "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
-          GST_TIME_ARGS (demux->segment.position),
-          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
-      if (GST_BUFFER_TIMESTAMP (buffer) < demux->segment.position) {
-        catch_up = TRUE;
-      } else {
-        stream->last_ret = GST_FLOW_OK;
-        gst_task_start (demux->stream_task);
-      }
-    }
-
     *size_buffer += gst_buffer_get_size (buffer);
-    if (catch_up) {
-      GstFlowReturn ret;
-
-      ret = gst_pad_push (stream->pad, buffer);
-      if (G_LIKELY (ret == GST_FLOW_OK))
-        stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
-      /* TODO handle return */
-    } else {
-      gst_dash_demux_stream_push_data (stream, buffer);
-      stream->has_data_queued = TRUE;
-    }
   } else {
     GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d",
         stream, stream->index);
   }
-  return TRUE;
+  return buffer;
 }
 
-/* gst_dash_demux_get_next_fragment:
+/* gst_dash_demux_stream_get_next_fragment:
  *
  * Get the next fragments for the stream with the earlier timestamp.
  * It returns the selected timestamp so the caller can deal with
@@ -2336,18 +2030,19 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
  * This function uses the generic URI downloader API.
  *
  * Returns FALSE if an error occured while downloading fragments
- * 
  */
 static GstFlowReturn
-gst_dash_demux_get_next_fragment (GstDashDemux * demux,
-    GstDashDemuxStream * stream, GstClockTime * selected_ts)
+gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream * stream,
+    GstClockTime * ts)
 {
   guint64 buffer_size = 0;
   GstClockTime diff;
   gboolean end_of_period = TRUE;
-  GstClockTime ts;
+  GstBuffer *buffer = NULL;
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstDashDemux *demux = stream->demux;
 
-  if (stream->download_end_of_period)
+  if (stream->stream_eos)
     return GST_FLOW_EOS;
 
   if (stream->last_ret == GST_FLOW_NOT_LINKED) {
@@ -2356,11 +2051,8 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux,
     return GST_FLOW_NOT_LINKED;
   }
 
-  if (gst_mpd_client_get_next_fragment_timestamp (demux->client,
-          stream->index, &ts)) {
-  } else {
-    GstEvent *event = NULL;
-
+  if (!gst_mpd_client_get_next_fragment_timestamp (demux->client,
+          stream->index, ts)) {
     GST_INFO_OBJECT (demux,
         "This Period doesn't contain more fragments for stream %u",
         stream->index);
@@ -2371,36 +2063,34 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux,
       end_of_period = FALSE;
       return GST_FLOW_OK;       /* TODO wait */
     }
-
-    if (gst_mpd_client_has_next_period (demux->client)) {
-      event = gst_event_new_dash_eop ();
-    } else {
-      GST_DEBUG_OBJECT (demux,
-          "No more fragments or periods for this stream, setting EOS");
-      event = gst_event_new_eos ();
-    }
-    stream->download_end_of_period = TRUE;
-    gst_dash_demux_stream_push_event (stream, event);
+    return GST_FLOW_EOS;
   }
 
   /*
    * If this is a live stream, check the segment end time to make sure
    * it is available to download
    */
-  if (stream && gst_mpd_client_is_live (demux->client) &&
+  if (gst_mpd_client_is_live (demux->client) &&
       demux->client->mpd_node->minimumUpdatePeriod != -1) {
 
     gst_dash_demux_wait_for_fragment_to_be_available (demux, stream);
   }
 
   /* Get the fragment corresponding to each stream index */
-  if (stream) {
-    gst_dash_demux_get_next_fragment_for_stream (demux, stream, &buffer_size,
-        &diff);
-    end_of_period = FALSE;
-  }
+  buffer =
+      gst_dash_demux_stream_download_fragment (demux, stream,
+      &buffer_size, &diff);
+  end_of_period = FALSE;
 
   demux->end_of_period = end_of_period;
+
+  if (buffer) {
+    ret = gst_dash_demux_push (stream, buffer);
+  } else {
+    GST_WARNING_OBJECT (stream->pad, "Failed to download fragment");
+    return GST_FLOW_ERROR;
+  }
+
   if (end_of_period)
     return GST_FLOW_EOS;
 
@@ -2419,7 +2109,7 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux,
         G_GUINT64_FORMAT " Ko in %.2f s)", stream->index, brate / 1000,
         buffer_size / 1024, ((double) diff / GST_SECOND));
   }
-  return GST_FLOW_OK;
+  return ret;
 }
 
 static void
index 935eec1..935e7e9 100644 (file)
@@ -68,38 +68,11 @@ struct _GstDashDemuxStream
   GstClockTime position;
   gboolean restart_download;
 
-  /*
-   * Need to store the status for the download and
-   * stream tasks separately as they are working at
-   * different points of the stream timeline.
-   * The download task is ahead of the stream.
-   *
-   * The download_end_of_period is set when a stream
-   * has already downloaded all fragments for the current
-   * period.
-   *
-   * The stream_end_of_period is set when a stream
-   * has pushed all fragments for the current period
-   */
-  gboolean download_end_of_period;
-  gboolean stream_end_of_period;
+  GstEvent *pending_segment;
 
   gboolean stream_eos;
   gboolean need_header;
 
-  /* tracks if a stream has enqueued data
-   * after a pad switch.
-   * This is required to prevent pads being
-   * added to the demuxer and having no data
-   * pushed to it before another pad switch
-   * as this might make downstream elements
-   * unhappy and error out if they get
-   * an EOS without receiving any input
-   */
-  gboolean has_data_queued;
-
-  GstDataQueue *queue;
-
   /* Download task */
   GMutex download_mutex;
   GCond download_cond;
@@ -127,7 +100,6 @@ struct _GstDashDemux
   GSList *next_periods;
 
   GstSegment segment;
-  gboolean need_segment;
   GstClockTime timestamp_offset;
 
   GstBuffer *manifest;
@@ -143,10 +115,6 @@ struct _GstDashDemux
   gfloat bandwidth_usage;       /* Percentage of the available bandwidth to use       */
   guint64 max_bitrate;          /* max of bitrate supported by target decoder         */
 
-  /* Streaming task */
-  GstTask *stream_task;
-  GRecMutex stream_task_lock;
-
   gboolean cancelled;
 
   /* Manifest update */