dashdemux: handle multiple languages
authorThiago Santos <ts.santos@sisa.samsung.com>
Wed, 4 Dec 2013 14:30:22 +0000 (11:30 -0300)
committerThiago Santos <ts.santos@sisa.samsung.com>
Tue, 24 Dec 2013 20:07:51 +0000 (17:07 -0300)
Handle multiple languages by using the not-linked return to stop
the download task for that stream. It can be reactivated when
a reconfigure event is received. Stopping the unused streams is
relevant to save network bandwidth

ext/dash/gstdashdemux.c
ext/dash/gstdashdemux.h

index 931a9f1..4ed1631 100644 (file)
@@ -181,8 +181,11 @@ enum
 #define DEFAULT_FAILED_COUNT 3
 #define DOWNLOAD_RATE_HISTORY_MAX 3
 
-#define GST_DASH_DEMUX_DOWNLOAD_LOCK(d) g_mutex_lock (&d->download_mutex)
-#define GST_DASH_DEMUX_DOWNLOAD_UNLOCK(d) g_mutex_unlock (&d->download_mutex)
+#define GST_DASH_DEMUX_CLIENT_LOCK(d) g_mutex_lock (&d->client_lock)
+#define GST_DASH_DEMUX_CLIENT_UNLOCK(d) g_mutex_unlock (&d->client_lock)
+
+#define GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK(s) g_mutex_lock (&s->download_mutex)
+#define GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK(s) g_mutex_unlock (&s->download_mutex)
 
 /* Custom internal event to signal end of period */
 #define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED)
@@ -225,7 +228,7 @@ static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux,
     GstDashDemuxStream * stream, GstActiveStream ** active_stream,
     GstClockTime * next_ts);
 static gboolean gst_dash_demux_advance_period (GstDashDemux * demux);
-static void gst_dash_demux_download_wait (GstDashDemux * demux,
+static void gst_dash_demux_download_wait (GstDashDemuxStream * stream,
     GstClockTime time_diff);
 
 static void gst_dash_demux_expose_streams (GstDashDemux * demux);
@@ -261,15 +264,13 @@ gst_dash_demux_dispose (GObject * obj)
     demux->stream_task = NULL;
   }
 
-  g_cond_clear (&demux->download_cond);
-  g_mutex_clear (&demux->download_mutex);
-
   if (demux->downloader != NULL) {
     g_object_unref (demux->downloader);
     demux->downloader = NULL;
   }
 
   g_mutex_clear (&demux->streams_lock);
+  g_mutex_clear (&demux->client_lock);
 
   G_OBJECT_CLASS (parent_class)->dispose (obj);
 }
@@ -345,10 +346,6 @@ gst_dash_demux_init (GstDashDemux * demux)
   demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE;
   demux->max_bitrate = DEFAULT_MAX_BITRATE;
 
-  /* Updates task */
-  g_cond_init (&demux->download_cond);
-  g_mutex_init (&demux->download_mutex);
-
   /* Streaming task */
   g_rec_mutex_init (&demux->stream_task_lock);
   demux->stream_task =
@@ -356,6 +353,7 @@ gst_dash_demux_init (GstDashDemux * demux)
   gst_task_set_lock (demux->stream_task, &demux->stream_task_lock);
 
   g_mutex_init (&demux->streams_lock);
+  g_mutex_init (&demux->client_lock);
 }
 
 static void
@@ -470,6 +468,38 @@ gst_dash_demux_stream_push_data (GstDashDemuxStream * stream,
   gst_data_queue_push (stream->queue, item);
 }
 
+static void
+gst_dash_demux_stream_seek (GstDashDemuxStream * stream,
+    GstClockTime target_pos)
+{
+  gint seg_i;
+  guint current_sequence = 0;
+  GstActiveStream *active_stream;
+  GstMediaSegment *chunk;
+  GstClockTime current_pos = 0;
+  GstDashDemux *demux = stream->demux;
+
+  active_stream =
+      gst_mpdparser_get_active_stream_by_index (demux->client, stream->index);
+  for (seg_i = 0; seg_i < active_stream->segments->len; seg_i++) {
+    chunk = g_ptr_array_index (active_stream->segments, seg_i);
+    current_pos = chunk->start_time;
+    /* current_sequence = chunk->number; */
+    GST_DEBUG_OBJECT (demux, "current_pos:%" GST_TIME_FORMAT
+        " <= target_pos:%" GST_TIME_FORMAT " duration:%"
+        GST_TIME_FORMAT, GST_TIME_ARGS (current_pos),
+        GST_TIME_ARGS (target_pos), GST_TIME_ARGS (chunk->duration));
+    if (current_pos <= target_pos && target_pos < current_pos + chunk->duration) {
+      GST_DEBUG_OBJECT (demux,
+          "selecting sequence %d for stream %" GST_PTR_FORMAT,
+          current_sequence, stream);
+      break;
+    }
+    current_sequence++;
+  }
+  gst_mpd_client_set_segment_index (active_stream, current_sequence);
+}
+
 static gboolean
 gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
@@ -487,9 +517,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
       gint64 start, stop;
       GList *list;
       GstClockTime current_pos, target_pos;
-      guint current_sequence, current_period;
-      GstActiveStream *active_stream;
-      GstMediaSegment *chunk;
+      guint current_period;
       GstStreamPeriod *period;
       GSList *iter;
       gboolean update;
@@ -575,32 +603,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
         /* Update the current sequence on all streams */
         for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
-          GstDashDemuxStream *stream = iter->data;
-          gint seg_i;
-
-          active_stream =
-              gst_mpdparser_get_active_stream_by_index (demux->client,
-              stream->index);
-          current_pos = 0;
-          current_sequence = 0;
-          for (seg_i = 0; seg_i < active_stream->segments->len; seg_i++) {
-            chunk = g_ptr_array_index (active_stream->segments, seg_i);
-            current_pos = chunk->start_time;
-            /* current_sequence = chunk->number; */
-            GST_DEBUG_OBJECT (demux, "current_pos:%" GST_TIME_FORMAT
-                " <= target_pos:%" GST_TIME_FORMAT " duration:%"
-                GST_TIME_FORMAT, GST_TIME_ARGS (current_pos),
-                GST_TIME_ARGS (target_pos), GST_TIME_ARGS (chunk->duration));
-            if (current_pos <= target_pos
-                && target_pos < current_pos + chunk->duration) {
-              GST_DEBUG_OBJECT (demux,
-                  "selecting sequence %d for stream %" GST_PTR_FORMAT,
-                  current_sequence, stream);
-              break;
-            }
-            current_sequence++;
-          }
-          gst_mpd_client_set_segment_index (active_stream, current_sequence);
+          gst_dash_demux_stream_seek (iter->data, target_pos);
         }
 
         if (flags & GST_SEEK_FLAG_FLUSH) {
@@ -624,6 +627,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
         for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
           GstDashDemuxStream *stream = iter->data;
           gst_data_queue_set_flushing (stream->queue, FALSE);
+          stream->last_ret = GST_FLOW_OK;
         }
         demux->timestamp_offset = 0;
         demux->need_segment = TRUE;
@@ -636,6 +640,28 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
       return TRUE;
     }
+    case GST_EVENT_RECONFIGURE:{
+      GSList *iter;
+
+      for (iter = demux->streams; iter; iter = g_slist_next (iter)) {
+        GstDashDemuxStream *stream = iter->data;
+
+        if (stream->pad == pad) {
+          GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream);
+          if (stream->last_ret == GST_FLOW_NOT_LINKED) {
+            stream->last_ret = GST_FLOW_OK;
+            stream->restart_download = TRUE;
+            stream->need_header = TRUE;
+            GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
+          }
+          gst_task_start (stream->download_task);
+          GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
+          gst_event_unref (event);
+          return TRUE;
+        }
+      }
+    }
+      break;
     default:
       break;
   }
@@ -703,6 +729,8 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux)
         gst_task_new ((GstTaskFunction) gst_dash_demux_stream_download_loop,
         stream, NULL);
     gst_task_set_lock (stream->download_task, &stream->download_task_lock);
+    g_cond_init (&stream->download_cond);
+    g_mutex_init (&stream->download_mutex);
 
     stream->index = i;
     stream->input_caps = caps;
@@ -1018,13 +1046,10 @@ gst_dash_demux_stop (GstDashDemux * demux)
 
     gst_data_queue_set_flushing (stream->queue, TRUE);
     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
-      GST_TASK_SIGNAL (stream->download_task);
+      stream->last_ret = GST_FLOW_FLUSHING;
+      stream->need_header = TRUE;
       gst_task_stop (stream->download_task);
-      g_mutex_lock (&demux->download_mutex);
-      g_cond_signal (&demux->download_cond);
-      g_mutex_unlock (&demux->download_mutex);
-      g_rec_mutex_lock (&stream->download_task_lock);
-      g_rec_mutex_unlock (&stream->download_task_lock);
+      GST_TASK_SIGNAL (stream->download_task);
       gst_task_join (stream->download_task);
     }
   }
@@ -1154,7 +1179,6 @@ static void
 gst_dash_demux_stream_loop (GstDashDemux * demux)
 {
   GstFlowReturn ret;
-  GstActiveStream *active_stream;
   GSList *iter;
   GstClockTime best_time;
   GstDashDemuxStream *selected_stream;
@@ -1172,6 +1196,9 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
 
     GST_DEBUG_OBJECT (demux, "Peeking stream %d", stream->index);
 
+    if (stream->last_ret == GST_FLOW_NOT_LINKED)
+      continue;
+
     if (stream->stream_eos) {
       GST_DEBUG_OBJECT (demux, "Stream %d is eos, skipping", stream->index);
       continue;
@@ -1221,6 +1248,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
     }
   }
 
+  ret = GST_FLOW_OK;
   if (selected_stream) {
     GstDataQueueItem *item;
 
@@ -1232,13 +1260,9 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
 
     if (G_LIKELY (GST_IS_BUFFER (item->object))) {
       GstBuffer *buffer;
-      GstClockTime timestamp;
+      GstClockTime timestamp, duration;
 
       buffer = GST_BUFFER_CAST (item->object);
-      active_stream =
-          gst_mpdparser_get_active_stream_by_index (demux->client,
-          selected_stream->index);
-
       timestamp = GST_BUFFER_TIMESTAMP (buffer);
 
       if (demux->need_segment) {
@@ -1255,32 +1279,36 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
       }
       /* make timestamp start from 0 by subtracting the offset */
       timestamp -= demux->timestamp_offset;
+      duration = GST_BUFFER_DURATION (buffer);
 
       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
 
       GST_DEBUG_OBJECT (demux,
           "Pushing fragment ts: %" GST_TIME_FORMAT " at pad %s",
           GST_TIME_ARGS (timestamp), GST_PAD_NAME (selected_stream->pad));
-#if 0
       GST_DEBUG_OBJECT (demux,
-          "Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%"
-          GST_TIME_FORMAT " at pad: %s:%s", buffer, GST_BUFFER_OFFSET (buffer),
-          selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+          "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%"
+          GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT " at pad: %s:%s", buffer,
+          GST_BUFFER_OFFSET (buffer), selected_stream->index,
+          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
           GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
           GST_DEBUG_PAD_NAME (selected_stream->pad));
-#endif
       ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer));
+      GST_DEBUG_OBJECT (demux, "Push result: %d %s", ret,
+          gst_flow_get_name (ret));
+
       demux->segment.position = timestamp;
+      selected_stream->position = timestamp;
+      if (GST_CLOCK_TIME_IS_VALID (duration))
+        selected_stream->position += duration;
 
       item->destroy (item);
-      if ((ret != GST_FLOW_OK) && (active_stream
-              && active_stream->mimeType == GST_STREAM_VIDEO))
-        goto error_pushing;
     } else {
       /* a GstEvent */
       if (GST_EVENT_TYPE (item->object) == GST_EVENT_EOS) {
         selected_stream->stream_end_of_period = TRUE;
         selected_stream->stream_eos = TRUE;
+        ret = GST_FLOW_EOS;
       } else if (GST_EVENT_TYPE (item->object) == GST_EVENT_DASH_EOP) {
         selected_stream->stream_end_of_period = TRUE;
       }
@@ -1298,6 +1326,19 @@ gst_dash_demux_stream_loop (GstDashDemux * demux)
       gst_dash_demux_advance_period (demux);
     }
   }
+  GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (selected_stream);
+  if (ret != selected_stream->last_ret) {
+    gst_task_start (selected_stream->download_task);
+    selected_stream->last_ret = ret;
+  }
+  switch (selected_stream->last_ret) {
+    case GST_FLOW_NOT_LINKED:
+      gst_data_queue_set_flushing (selected_stream->queue, TRUE);
+      break;
+    default:
+      break;
+  }
+  GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (selected_stream);
 
 end:
   GST_INFO_OBJECT (demux, "Leaving streaming task");
@@ -1317,6 +1358,7 @@ end_of_manifest:
     return;
   }
 
+#if 0
 error_pushing:
   {
     /* FIXME: handle error */
@@ -1326,6 +1368,7 @@ error_pushing:
     gst_task_stop (demux->stream_task);
     return;
   }
+#endif
 }
 
 static void
@@ -1350,6 +1393,8 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream)
     gst_object_unref (stream->download_task);
     g_rec_mutex_clear (&stream->download_task_lock);
   }
+  g_cond_clear (&stream->download_cond);
+  g_mutex_clear (&stream->download_mutex);
 
   g_free (stream);
 }
@@ -1630,26 +1675,43 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
   GstDashDemux *demux = stream->demux;
   GstFlowReturn flow_ret = GST_FLOW_OK;
 
-  GST_LOG_OBJECT (demux, "Starting download loop %p %s:%s", stream,
-      GST_DEBUG_PAD_NAME (stream->pad));
+  GST_LOG_OBJECT (stream->pad, "Starting download loop");
 
-  GST_DASH_DEMUX_DOWNLOAD_LOCK (demux);
+  GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream);
+  if (stream->last_ret < GST_FLOW_OK) {
+    if (demux->cancelled) {
+      GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
+      goto cancelled;
+    }
+    GST_DEBUG_OBJECT (stream->pad, "Download loop waiting due to flow return: "
+        "%d %s", stream->last_ret, gst_flow_get_name (stream->last_ret));
+    gst_task_pause (stream->download_task);
+    GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
+    return;
+  }
+  GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
+
+  if (demux->cancelled) {
+    goto cancelled;
+  }
+
+  GST_DASH_DEMUX_CLIENT_LOCK (demux);
   if (gst_mpd_client_is_live (demux->client)
       && demux->client->mpd_uri != NULL) {
     switch (gst_dash_demux_refresh_mpd (demux)) {
       case GST_FLOW_EOS:
-        GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux);
+        GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
         goto end_of_manifest;
       default:
         break;
     }
   }
 
-  GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest);
+  GST_DEBUG_OBJECT (stream->pad, "End of manifest: %d", demux->end_of_manifest);
 
   /* try to switch to another set of representations if needed */
   gst_dash_demux_stream_select_representation_unlocked (stream);
-  GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux);
+  GST_DASH_DEMUX_CLIENT_UNLOCK (demux);
 
   /* fetch the next fragment */
   flow_ret = gst_dash_demux_get_next_fragment (demux, stream, &fragment_stream,
@@ -1659,13 +1721,14 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
     case GST_FLOW_OK:
       break;
     case GST_FLOW_EOS:
+      GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream);
       if (demux->end_of_period) {
-        GST_INFO_OBJECT (demux, "Reached the end of the Period");
+        GST_INFO_OBJECT (stream->pad, "Reached the end of the Period");
         /* setup video, audio and subtitle streams, starting from the next Period */
         if (!gst_mpd_client_set_period_index (demux->client,
                 gst_mpd_client_get_period_index (demux->client) + 1)
             || !gst_dash_demux_setup_all_streams (demux)) {
-          GST_INFO_OBJECT (demux, "Reached the end of the manifest file");
+          GST_INFO_OBJECT (stream->pad, "Reached the end of the manifest file");
           demux->end_of_manifest = TRUE;
           gst_task_start (demux->stream_task);
           goto end_of_manifest;
@@ -1674,6 +1737,8 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
         gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0);
         demux->end_of_period = FALSE;
       }
+      gst_task_pause (stream->download_task);
+      GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream);
       break;
     case GST_FLOW_ERROR:
       /* Download failed 'by itself'
@@ -1687,7 +1752,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
         pos =
             gst_mpd_client_check_time_position (demux->client, fragment_stream,
             fragment_ts, &time_diff);
-        GST_DEBUG_OBJECT (demux,
+        GST_DEBUG_OBJECT (stream->pad,
             "Checked position for fragment ts %" GST_TIME_FORMAT
             ", res: %d, diff: %" G_GINT64_FORMAT, GST_TIME_ARGS (fragment_ts),
             pos, time_diff);
@@ -1697,7 +1762,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
           /* we're behind, try moving to the 'present' */
           GDateTime *now = g_date_time_new_now_utc ();
 
-          GST_DEBUG_OBJECT (demux,
+          GST_DEBUG_OBJECT (stream->pad,
               "Falling behind live stream, moving forward");
           gst_mpd_client_seek_to_time (demux->client, now);
           g_date_time_unref (now);
@@ -1705,10 +1770,11 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
         } else if (pos > 0) {
           /* we're ahead, wait a little */
 
-          GST_DEBUG_OBJECT (demux, "Waiting for next segment to be created");
+          GST_DEBUG_OBJECT (stream->pad,
+              "Waiting for next segment to be created");
           gst_mpd_client_set_segment_index (fragment_stream,
               fragment_stream->segment_idx - 1);
-          gst_dash_demux_download_wait (demux, time_diff);
+          gst_dash_demux_download_wait (stream, time_diff);
         } else {
           gst_mpd_client_set_segment_index (fragment_stream,
               fragment_stream->segment_idx - 1);
@@ -1719,7 +1785,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
       }
 
       if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
-        GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
+        GST_WARNING_OBJECT (stream->pad, "Could not fetch the next fragment");
         goto quit;
       } else {
         goto error_downloading;
@@ -1733,24 +1799,23 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream)
     goto cancelled;
   }
 
-  GST_INFO_OBJECT (demux, "Internal buffering : %" G_GUINT64_FORMAT " s",
+  GST_INFO_OBJECT (stream->pad, "Internal buffering : %" G_GUINT64_FORMAT " s",
       gst_dash_demux_get_buffering_time (demux) / GST_SECOND);
   demux->client->update_failed_count = 0;
 
 quit:
-  GST_DEBUG_OBJECT (demux, "Finishing download loop");
+  GST_DEBUG_OBJECT (stream->pad, "Finishing download loop");
   return;
 
 cancelled:
   {
-    GST_WARNING_OBJECT (demux, "Cancelled, leaving download task");
-    gst_task_stop (stream->download_task);
+    GST_WARNING_OBJECT (stream->pad, "Cancelled, leaving download task");
     return;
   }
 
 end_of_manifest:
   {
-    GST_INFO_OBJECT (demux, "End of manifest, leaving download task");
+    GST_INFO_OBJECT (stream->pad, "End of manifest, leaving download task");
     gst_task_stop (stream->download_task);
     return;
   }
@@ -2006,7 +2071,7 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
 
 static void
 gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux,
-    GstActiveStream * stream)
+    GstDashDemuxStream * dash_stream, GstActiveStream * stream)
 {
   GstDateTime *seg_end_time;
   GstDateTime *cur_time = gst_date_time_new_now_utc ();
@@ -2027,26 +2092,82 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux,
       GST_DEBUG_OBJECT (demux,
           "Selected fragment has end timestamp > now (%" PRIi64
           "), delaying download", diff);
-      gst_dash_demux_download_wait (demux, diff);
+      gst_dash_demux_download_wait (dash_stream, diff);
     }
   }
 }
 
 static gboolean
 gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
-    GstDashDemuxStream * demux_stream, guint64 * size_buffer,
+    GstDashDemuxStream * stream, guint64 * size_buffer,
     GstClockTime * download_time)
 {
   GstActiveStream *active_stream;
   GstFragment *download;
   GTimeVal now;
   GTimeVal start;
-  guint stream_idx = demux_stream->index;
+  guint stream_idx = stream->index;
   GstBuffer *buffer;
   GstBuffer *header_buffer;
   GstMediaFragmentInfo fragment;
 
+  if (G_UNLIKELY (stream->restart_download)) {
+    GstClockTime cur, ts;
+    gint64 pos;
+    GstEvent *gap;
+
+    GST_DEBUG_OBJECT (stream->pad,
+        "Reactivating stream after to reconfigure event");
+
+    cur = GST_CLOCK_TIME_IS_VALID (stream->position) ? stream->position : 0;
+
+    if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
+      ts = (GstClockTime) pos;
+      GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (ts));
+    } else {
+      ts = demux->segment.position;
+      GST_DEBUG_OBJECT (stream->pad, "Downstream position query failed, "
+          "failling back to looking at other pads");
+    }
+
+    GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
+        "position %" GST_TIME_FORMAT ", current catch up %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (ts), GST_TIME_ARGS (demux->segment.position));
+
+    if (GST_CLOCK_TIME_IS_VALID (ts)) {
+      gst_dash_demux_stream_seek (stream, ts);
+
+      if (cur < ts) {
+        gap = gst_event_new_gap (cur, ts - cur);
+        gst_pad_push_event (stream->pad, gap);
+      }
+    }
+
+    /* This stream might be entering into catching up mode,
+     * meaning that it will push buffers from this same download thread
+     * until it reaches 'catch_up_timestamp'.
+     *
+     * The reason for this is that in case of stream switching, the other
+     * stream that was previously active might be blocking the stream_loop
+     * in case it is ahead enough that all queues are filled.
+     * In this case, it is possible that a downstream input-selector is
+     * blocking waiting for the currently active stream to reach the
+     * same position of the old linked stream because of the 'sync-streams'
+     * behavior.
+     *
+     * We can push from this thread up to 'catch_up_timestamp' as all other
+     * streams should be around the same timestamp.
+     */
+    stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
+
+    gst_data_queue_set_flushing (stream->queue, FALSE);
+    stream->restart_download = FALSE;
+    gst_task_start (demux->stream_task);
+  }
+
   if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) {
+    gboolean catch_up = FALSE;
 
     g_get_current_time (&start);
     GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx);
@@ -2100,13 +2221,13 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
       }
     }
 
-    if (demux_stream->need_header) {
+    if (stream->need_header) {
       /* We need to fetch a new header */
       if ((header_buffer =
               gst_dash_demux_get_next_header (demux, stream_idx)) != NULL) {
         buffer = gst_buffer_append (header_buffer, buffer);
       }
-      demux_stream->need_header = FALSE;
+      stream->need_header = FALSE;
     }
     g_get_current_time (&now);
     *download_time = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start));
@@ -2120,12 +2241,36 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux,
 
     gst_media_fragment_info_clear (&fragment);
 
-    gst_dash_demux_stream_push_data (demux_stream, buffer);
-    demux_stream->has_data_queued = TRUE;
+    /* Check if this stream is on catch up mode */
+    if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
+      GST_DEBUG_OBJECT (stream->pad,
+          "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
+          GST_TIME_ARGS (demux->segment.position),
+          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
+      if (GST_BUFFER_TIMESTAMP (buffer) < demux->segment.position) {
+        catch_up = TRUE;
+      } else {
+        stream->last_ret = GST_FLOW_OK;
+        gst_task_start (demux->stream_task);
+      }
+    }
+
+    if (catch_up) {
+      GstFlowReturn ret;
+
+      ret = gst_pad_push (stream->pad, buffer);
+      if (G_LIKELY (ret == GST_FLOW_OK))
+        stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
+      /* TODO handle return */
+    } else {
+      gst_dash_demux_stream_push_data (stream, buffer);
+      stream->has_data_queued = TRUE;
+    }
+
     *size_buffer += gst_buffer_get_size (buffer);
   } else {
     GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d",
-        demux_stream, demux_stream->index);
+        stream, stream->index);
   }
   return TRUE;
 }
@@ -2196,7 +2341,8 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux,
   if (stream && gst_mpd_client_is_live (demux->client) &&
       demux->client->mpd_node->minimumUpdatePeriod != -1) {
 
-    gst_dash_demux_wait_for_fragment_to_be_available (demux, *active_stream);
+    gst_dash_demux_wait_for_fragment_to_be_available (demux, stream,
+        *active_stream);
   }
 
   /* Get the fragment corresponding to each stream index */
@@ -2229,12 +2375,13 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux,
 }
 
 static void
-gst_dash_demux_download_wait (GstDashDemux * demux, GstClockTime time_diff)
+gst_dash_demux_download_wait (GstDashDemuxStream * stream,
+    GstClockTime time_diff)
 {
   gint64 end_time = g_get_monotonic_time () + time_diff / GST_USECOND;
 
-  GST_DEBUG_OBJECT (demux, "Download waiting for %" GST_TIME_FORMAT,
+  GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
       GST_TIME_ARGS (time_diff));
-  g_cond_wait_until (&demux->download_cond, &demux->download_mutex, end_time);
-  GST_DEBUG_OBJECT (demux, "Download finished waiting");
+  g_cond_wait_until (&stream->download_cond, &stream->download_mutex, end_time);
+  GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
 }
index 84baa8e..604dc4a 100644 (file)
@@ -63,6 +63,10 @@ struct _GstDashDemuxStream
 
   GstCaps *input_caps;
 
+  GstFlowReturn last_ret;
+  GstClockTime position;
+  gboolean restart_download;
+
   /*
    * Need to store the status for the download and
    * stream tasks separately as they are working at
@@ -94,6 +98,10 @@ struct _GstDashDemuxStream
   gboolean has_data_queued;
 
   GstDataQueue *queue;
+
+  /* Download task */
+  GMutex download_mutex;
+  GCond download_cond;
   GstTask *download_task;
   GRecMutex download_task_lock;
 
@@ -124,6 +132,8 @@ struct _GstDashDemux
   GstBuffer *manifest;
   GstUriDownloader *downloader;
   GstMpdClient *client;         /* MPD client */
+  GMutex client_lock;
+
   gboolean end_of_period;
   gboolean end_of_manifest;
 
@@ -136,9 +146,6 @@ struct _GstDashDemux
   GstTask *stream_task;
   GRecMutex stream_task_lock;
 
-  /* Download task */
-  GMutex download_mutex;
-  GCond download_cond;
   gboolean cancelled;
 
   /* Manifest update */