mssdemux: avoid downloading not-linked streams
authorThiago Santos <ts.santos@sisa.samsung.com>
Tue, 12 Nov 2013 12:58:31 +0000 (09:58 -0300)
committerThiago Santos <ts.santos@sisa.samsung.com>
Wed, 18 Dec 2013 21:52:18 +0000 (18:52 -0300)
When a stream gets a not-linked return, it will be marked as so and
won't download any more new fragments until a reconfigure event
is received. This will make mssdemux expose all pads, but only download
fragments for the streams that are actually being used.

Relying on the pads being linked/unlinked isn't enough in this scenario
as there might be an input-selector downstream that is actually discarding
buffers for a given linked pad.

When streams are switching, the old active stream can be blocked because
input-selector will block not-linked streams. In case the mssdemux's
stream loop is blocked pushing a buffer to a full queue downstream it will
never unblock as the queue will not drain (input-selector is blocking).

In this scenario, stream switching will deadlock as input-selector is
waiting for the newly active stream data and the stream_loop that would
push this data is blocked waiting for input-selector.

To solve this issue, whenever an stream is reactivated on a reconfigure
it will enter into the 'catch up mode', in this mode it can push buffers
from its download thread until it reaches the currrent GstSegment's position.
This works because this timestamp will always be behind or equal to the maximum
timestamp pushed for all streams, after pushing data for this timestamp,
the stream will go back to default and be pushed sequentially from the main
streaming thread. By this time, the input-selector should have already
released the thread.

https://bugzilla.gnome.org/show_bug.cgi?id=711849

ext/smoothstreaming/gstmssdemux.c
ext/smoothstreaming/gstmssdemux.h

index 3688639..fc7fa71 100644 (file)
@@ -234,6 +234,7 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
   stream->downloader = gst_uri_downloader_new ();
   stream->dataqueue =
       gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream);
+  g_mutex_init (&stream->mutex);
 
   /* Downloading task */
   g_rec_mutex_init (&stream->download_lock);
@@ -292,6 +293,7 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
   }
   if (stream->caps)
     gst_caps_unref (stream->caps);
+  g_mutex_clear (&stream->mutex);
   g_free (stream);
 }
 
@@ -619,6 +621,9 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
         GstMssDemuxStream *stream = iter->data;
 
         stream->eos = FALSE;
+        if (flags & GST_SEEK_FLAG_FLUSH) {
+          stream->last_ret = GST_FLOW_OK;
+        }
         gst_data_queue_flush (stream->dataqueue);
         gst_event_replace (&stream->pending_newsegment, newsegment);
       }
@@ -637,6 +642,26 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
       gst_event_unref (event);
       return TRUE;
     }
+    case GST_EVENT_RECONFIGURE:{
+      GSList *iter;
+
+      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+        GstMssDemuxStream *stream = iter->data;
+
+        if (stream->pad == pad) {
+          GST_MSS_DEMUX_STREAM_LOCK (stream);
+          if (GST_TASK_STATE (stream->download_task) == GST_TASK_PAUSED
+              && stream->last_ret == GST_FLOW_NOT_LINKED) {
+            stream->restart_download = TRUE;
+            gst_task_start (stream->download_task);
+          }
+          GST_MSS_DEMUX_STREAM_UNLOCK (stream);
+          gst_event_unref (event);
+          return TRUE;
+        }
+      }
+    }
+      break;
     default:
       break;
   }
@@ -1051,16 +1076,23 @@ gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
 
 static GstFlowReturn
 gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
-    gboolean * buffer_downloaded)
+    GstBuffer ** _buffer)
 {
   GstMssDemux *mssdemux = stream->parent;
   gchar *path;
   gchar *url;
   GstFragment *fragment;
-  GstBuffer *_buffer;
+  GstBuffer *buffer;
   GstFlowReturn ret = GST_FLOW_OK;
   guint64 before_download, after_download;
 
+  /* special case for not-linked streams */
+  if (stream->last_ret == GST_FLOW_NOT_LINKED) {
+    GST_DEBUG_OBJECT (mssdemux, "Skipping download for not-linked stream %p",
+        stream);
+    return GST_FLOW_NOT_LINKED;
+  }
+
   before_download = g_get_real_time ();
 
   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
@@ -1103,22 +1135,19 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
     return GST_FLOW_ERROR;
   }
 
-  _buffer = gst_fragment_get_buffer (fragment);
-  _buffer = gst_buffer_make_writable (_buffer);
-  GST_BUFFER_TIMESTAMP (_buffer) =
+  buffer = gst_fragment_get_buffer (fragment);
+  *_buffer = buffer = gst_buffer_make_writable (buffer);
+  GST_BUFFER_TIMESTAMP (buffer) =
       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
-  GST_BUFFER_DURATION (_buffer) =
+  GST_BUFFER_DURATION (buffer) =
       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
 
   g_object_unref (fragment);
 
-  if (buffer_downloaded)
-    *buffer_downloaded = _buffer != NULL;
-
   after_download = g_get_real_time ();
   if (_buffer) {
 #ifndef GST_DISABLE_GST_DEBUG
-    guint64 bitrate = (8 * gst_buffer_get_size (_buffer) * 1000000LLU) /
+    guint64 bitrate = (8 * gst_buffer_get_size (buffer) * 1000000LLU) /
         (after_download - before_download);
 #endif
 
@@ -1126,16 +1155,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
         "Measured download bitrate: %s %" G_GUINT64_FORMAT " bps",
         GST_PAD_NAME (stream->pad), bitrate);
     gst_download_rate_add_rate (&stream->download_rate,
-        gst_buffer_get_size (_buffer),
+        gst_buffer_get_size (buffer),
         1000 * (after_download - before_download));
-
-    GST_DEBUG_OBJECT (mssdemux,
-        "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
-        " Duration: %" GST_TIME_FORMAT,
-        stream, GST_PAD_NAME (stream->pad),
-        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
-        GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
-    gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
   }
 
   return ret;
@@ -1160,22 +1181,130 @@ static void
 gst_mss_demux_download_loop (GstMssDemuxStream * stream)
 {
   GstMssDemux *mssdemux = stream->parent;
-  gboolean buffer_downloaded = FALSE;
   GstFlowReturn ret;
+  GstBuffer *buffer = NULL;
+  gboolean buffer_downloaded = FALSE;
+  GstEvent *gap = NULL;
 
   GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
 
   GST_OBJECT_LOCK (mssdemux);
+  if (G_UNLIKELY (stream->restart_download)) {
+    GstClockTime cur, ts;
+    gint64 pos;
+
+    GST_MSS_DEMUX_STREAM_LOCK (stream);
+
+    GST_DEBUG_OBJECT (mssdemux,
+        "Activating stream %p due to reconfigure " "event", stream);
+
+    cur = GST_CLOCK_TIME_IS_VALID (stream->next_timestamp) ?
+        stream->next_timestamp : 0;
+
+    if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
+      ts = (GstClockTime) pos;
+      GST_DEBUG_OBJECT (mssdemux, "Downstream position: %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (ts));
+    } else {
+      GST_DEBUG_OBJECT (mssdemux, "Downstream position query failed, "
+          "failling back to segment position");
+      ts = mssdemux->segment.position;
+    }
+
+    /* we might have already pushed this data */
+    ts = MAX (ts, stream->next_timestamp);
+
+    GST_DEBUG_OBJECT (mssdemux, "Restarting stream %p %s:%s at "
+        "position %" GST_TIME_FORMAT ", catching up until segment position %"
+        GST_TIME_FORMAT, stream, GST_DEBUG_PAD_NAME (stream->pad),
+        GST_TIME_ARGS (ts), GST_TIME_ARGS (mssdemux->segment.position));
+
+    if (GST_CLOCK_TIME_IS_VALID (ts)) {
+      gst_mss_stream_seek (stream->manifest_stream, ts);
+
+      if (cur < ts) {
+        gap = gst_event_new_gap (cur, ts - cur);
+      }
+    }
+
+    /* This stream might be entering into catching up mode,
+     * meaning that it will push buffers from this same download thread
+     * until it reaches the segment position.
+     *
+     * The reason for this is that in case of stream switching, the other
+     * stream that was previously active might be blocking the stream_loop
+     * in case it is ahead enough that all queues are filled.
+     * In this case, it is possible that a downstream input-selector is
+     * blocking waiting for the currently active stream to reach the
+     * same position of the old linked stream because of the 'sync-streams'
+     * behavior.
+     *
+     * We can push from this thread up to segment position as all other
+     * streams should be around the same timestamp.
+     */
+    stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
+    stream->eos = FALSE;
+
+    gst_data_queue_set_flushing (stream->dataqueue, FALSE);
+    stream->restart_download = FALSE;
+    gst_task_start (mssdemux->stream_task);
+    GST_MSS_DEMUX_STREAM_UNLOCK (stream);
+  }
+
   GST_DEBUG_OBJECT (mssdemux,
       "Starting streams reconfiguration due to bitrate changes");
   gst_mss_demux_reconfigure_stream (stream);
   GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
   GST_OBJECT_UNLOCK (mssdemux);
 
-  ret = gst_mss_demux_stream_download_fragment (stream, &buffer_downloaded);
+  if (gap != NULL)
+    gst_pad_push_event (stream->pad, gap);
+
+  ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+  buffer_downloaded = buffer != NULL;
 
-  if (stream->cancelled)
+  if (stream->cancelled) {
+    if (buffer)
+      gst_buffer_unref (buffer);
     goto cancelled;
+  }
+
+  if (buffer) {
+    gboolean catch_up = FALSE;
+
+    /* Check if this stream is on catch up mode */
+    if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
+      GST_DEBUG_OBJECT (mssdemux,
+          "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
+          GST_TIME_ARGS (mssdemux->segment.position),
+          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
+      if (GST_BUFFER_TIMESTAMP (buffer) < mssdemux->segment.position) {
+        catch_up = TRUE;
+      } else {
+        GST_OBJECT_LOCK (mssdemux);
+        stream->last_ret = GST_FLOW_OK;
+        gst_task_start (mssdemux->stream_task);
+        GST_OBJECT_UNLOCK (mssdemux);
+      }
+    }
+
+    GST_DEBUG_OBJECT (mssdemux,
+        "%s buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
+        " Duration: %" GST_TIME_FORMAT,
+        catch_up ? "Catch up push for" : "Storing", stream,
+        GST_PAD_NAME (stream->pad),
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+        GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+    if (catch_up) {
+      ret = stream->last_ret = gst_pad_push (stream->pad, buffer);
+      if (G_LIKELY (ret == GST_FLOW_OK))
+        stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
+      /* TODO handle return */
+    } else {
+      gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer));
+    }
+  }
 
   switch (ret) {
     case GST_FLOW_OK:
@@ -1184,6 +1313,8 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
       goto eos;
     case GST_FLOW_ERROR:
       goto error;
+    case GST_FLOW_NOT_LINKED:
+      goto notlinked;
     default:
       break;
   }
@@ -1203,7 +1334,10 @@ eos:
         GST_DEBUG_PAD_NAME (stream->pad));
     gst_mss_demux_stream_store_object (stream,
         GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
+    GST_OBJECT_LOCK (mssdemux);
     gst_task_pause (stream->download_task);
+    gst_task_start (mssdemux->stream_task);
+    GST_OBJECT_UNLOCK (mssdemux);
     return;
   }
 error:
@@ -1222,6 +1356,15 @@ cancelled:
     gst_task_pause (stream->download_task);
     return;
   }
+notlinked:
+  {
+    GST_MSS_DEMUX_STREAM_LOCK (stream);
+    if (stream->last_ret == GST_FLOW_NOT_LINKED) {
+      gst_task_pause (stream->download_task);
+      gst_data_queue_set_flushing (stream->dataqueue, TRUE);
+    }
+    GST_MSS_DEMUX_STREAM_UNLOCK (stream);
+  }
 }
 
 static GstFlowReturn
@@ -1242,12 +1385,18 @@ gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
     GstDataQueueItem *item;
 
     other = iter->data;
-    if (other->eos) {
+    if (other->eos || other->last_ret == GST_FLOW_NOT_LINKED) {
+      GST_DEBUG_OBJECT (mssdemux, "Skipping stream %p eos:%d last-ret:%d",
+          other, other->eos, other->last_ret);
       continue;
     }
 
     if (!gst_data_queue_peek (other->dataqueue, &item)) {
       /* flushing */
+      if (other->last_ret == GST_FLOW_NOT_LINKED) {
+        /* might have been unlinked and won't receive data for now */
+        continue;
+      }
       return GST_FLOW_FLUSHING;
     }
 
@@ -1269,6 +1418,27 @@ gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
   return ret;
 }
 
+static GstFlowReturn
+gst_mss_demux_combine_flows (GstMssDemux * mssdemux)
+{
+  gboolean all_notlinked = TRUE;
+  GSList *iter;
+
+  for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+    GstMssDemuxStream *stream = iter->data;
+
+    if (stream->last_ret != GST_FLOW_NOT_LINKED)
+      all_notlinked = FALSE;
+
+    if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
+        || stream->last_ret == GST_FLOW_FLUSHING)
+      return stream->last_ret;
+  }
+  if (all_notlinked)
+    return GST_FLOW_NOT_LINKED;
+  return GST_FLOW_OK;
+}
+
 static void
 gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
 {
@@ -1289,6 +1459,8 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
     GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
         gst_flow_get_name (ret));
 
+  /* Lock as this may change the tasks state */
+  GST_OBJECT_LOCK (mssdemux);
   switch (ret) {
     case GST_FLOW_OK:
       break;
@@ -1302,6 +1474,7 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
     default:
       g_assert_not_reached ();
   }
+  GST_OBJECT_UNLOCK (mssdemux);
 
   GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
       stream, GST_PAD_NAME (stream->pad));
@@ -1332,17 +1505,20 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
 
     GST_DEBUG_OBJECT (mssdemux,
         "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
-        " discont:%d on pad %s", object,
+        " discont:%d on pad %s:%s", object,
         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
         GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
         GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
-        GST_PAD_NAME (stream->pad));
+        GST_DEBUG_PAD_NAME (stream->pad));
 
     stream->next_timestamp =
         GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
 
     stream->have_data = TRUE;
-    ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
+    mssdemux->segment.position = GST_BUFFER_TIMESTAMP (object);
+    stream->last_ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
+    GST_DEBUG_OBJECT (mssdemux, "Pushed on pad %s:%s result: %d (%s)",
+        GST_DEBUG_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
   } else if (GST_IS_EVENT (object)) {
     if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
       stream->eos = TRUE;
@@ -1350,21 +1526,27 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
     GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
         GST_PAD_NAME (stream->pad));
     gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
+    stream->last_ret = GST_FLOW_EOS;
   } else {
     g_return_if_reached ();
   }
 
+  /* Lock as this may change the tasks state */
+  GST_OBJECT_LOCK (mssdemux);
+  ret = gst_mss_demux_combine_flows (mssdemux);
   switch (ret) {
     case GST_FLOW_EOS:
       goto eos;                 /* EOS ? */
     case GST_FLOW_ERROR:
       goto error;
     case GST_FLOW_NOT_LINKED:
-      break;                    /* TODO what to do here? pause the task or just keep pushing? */
+      /* stream won't download any more data until it gets a reconfigure */
+      break;
     case GST_FLOW_OK:
     default:
       break;
   }
+  GST_OBJECT_UNLOCK (mssdemux);
 
   GST_LOG_OBJECT (mssdemux, "Stream loop end");
   return;
@@ -1373,18 +1555,21 @@ eos:
   {
     GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
     gst_task_pause (mssdemux->stream_task);
+    GST_OBJECT_UNLOCK (mssdemux);
     return;
   }
 error:
   {
     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
     gst_task_pause (mssdemux->stream_task);
+    GST_OBJECT_UNLOCK (mssdemux);
     return;
   }
 stop:
   {
     GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task");
     gst_task_pause (mssdemux->stream_task);
+    GST_OBJECT_UNLOCK (mssdemux);
     return;
   }
 }
index 6bc3ba4..d651b80 100644 (file)
@@ -52,9 +52,14 @@ typedef struct _GstMssDemuxStream GstMssDemuxStream;
 typedef struct _GstMssDemux GstMssDemux;
 typedef struct _GstMssDemuxClass GstMssDemuxClass;
 
+#define GST_MSS_DEMUX_STREAM_LOCK(s) g_mutex_lock (&(s)->mutex)
+#define GST_MSS_DEMUX_STREAM_UNLOCK(s) g_mutex_unlock (&(s)->mutex)
+
 struct _GstMssDemuxStream {
   GstPad *pad;
 
+  GMutex mutex;
+
   GstCaps *caps;
 
   GstMssDemux *parent;
@@ -72,9 +77,11 @@ struct _GstMssDemuxStream {
   GstTask *download_task;
   GRecMutex download_lock;
 
+  GstFlowReturn last_ret;
   gboolean eos;
   gboolean have_data;
   gboolean cancelled;
+  gboolean restart_download;
 
   GstDownloadRate download_rate;