mssdemux: rewriting pad tasks so that buffers are pushed by ts order
authorThiago Santos <thiago.sousa.santos@collabora.com>
Mon, 14 Jan 2013 16:21:10 +0000 (13:21 -0300)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Wed, 8 May 2013 00:05:12 +0000 (21:05 -0300)
Use pad tasks to download data and an extra task that gets the earlier
buffer (with the smallest timestamp) and pushes on the corresponding
pad.

This prevents that the audio stream rushes ahead on buffers as its
fragments should be smaller

ext/smoothstreaming/gstmssdemux.c
ext/smoothstreaming/gstmssdemux.h

index 8e4e263feb0f54d7e0c46ca83c9fce74d4ac41a1..a596726c673e9bca335dbfed5c7988177d41bd18 100644 (file)
@@ -85,7 +85,8 @@ static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
 
 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
 
-static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
+static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
+static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
 
 static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
 
@@ -143,6 +144,23 @@ gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
   gst_pad_set_event_function (mssdemux->sinkpad,
       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
+
+  g_static_rec_mutex_init (&mssdemux->stream_lock);
+  mssdemux->stream_task =
+      gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux);
+  gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
+}
+
+static gboolean
+_data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
+    guint64 time, gpointer checkdata)
+{
+  GstMssDemuxStream *stream = checkdata;
+  GstMssDemux *mssdemux = stream->parent;
+
+  if (mssdemux->data_queue_max_size == 0)
+    return FALSE;               /* never full */
+  return visible >= mssdemux->data_queue_max_size;
 }
 
 static GstMssDemuxStream *
@@ -153,12 +171,13 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
 
   stream = g_new0 (GstMssDemuxStream, 1);
   stream->downloader = gst_uri_downloader_new ();
+  stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream);
 
-  /* Streaming task */
-  g_static_rec_mutex_init (&stream->stream_lock);
-  stream->stream_task =
-      gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
-  gst_task_set_lock (stream->stream_task, &stream->stream_lock);
+  /* Downloading task */
+  g_static_rec_mutex_init (&stream->download_lock);
+  stream->download_task =
+      gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream);
+  gst_task_set_lock (stream->download_task, &stream->download_lock);
 
   stream->pad = srcpad;
   stream->manifest_stream = manifeststream;
@@ -170,20 +189,20 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux,
 static void
 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
 {
-  if (stream->stream_task) {
-    if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
+  if (stream->download_task) {
+    if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
           GST_DEBUG_PAD_NAME (stream->pad));
-      gst_task_stop (stream->stream_task);
-      g_static_rec_mutex_lock (&stream->stream_lock);
-      g_static_rec_mutex_unlock (&stream->stream_lock);
+      gst_task_stop (stream->download_task);
+      g_static_rec_mutex_lock (&stream->download_lock);
+      g_static_rec_mutex_unlock (&stream->download_lock);
       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
-      gst_task_join (stream->stream_task);
+      gst_task_join (stream->download_task);
       GST_LOG_OBJECT (stream->parent, "Finished");
     }
-    gst_object_unref (stream->stream_task);
-    g_static_rec_mutex_free (&stream->stream_lock);
-    stream->stream_task = NULL;
+    gst_object_unref (stream->download_task);
+    g_static_rec_mutex_free (&stream->download_lock);
+    stream->download_task = NULL;
   }
 
   if (stream->pending_newsegment) {
@@ -196,6 +215,10 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
     g_object_unref (stream->downloader);
     stream->downloader = NULL;
   }
+  if (stream->dataqueue) {
+    g_object_unref (stream->dataqueue);
+    stream->dataqueue = NULL;
+  }
   if (stream->pad) {
     gst_object_unref (stream->pad);
     stream->pad = NULL;
@@ -207,6 +230,14 @@ static void
 gst_mss_demux_reset (GstMssDemux * mssdemux)
 {
   GSList *iter;
+
+  if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
+    gst_task_stop (mssdemux->stream_task);
+    g_static_rec_mutex_lock (&mssdemux->stream_lock);
+    g_static_rec_mutex_unlock (&mssdemux->stream_lock);
+    gst_task_join (mssdemux->stream_task);
+  }
+
   if (mssdemux->manifest_buffer) {
     gst_buffer_unref (mssdemux->manifest_buffer);
     mssdemux->manifest_buffer = NULL;
@@ -233,7 +264,13 @@ gst_mss_demux_reset (GstMssDemux * mssdemux)
 static void
 gst_mss_demux_dispose (GObject * object)
 {
-  /* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */
+  GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object);
+
+  if (mssdemux->stream_task) {
+    gst_object_unref (mssdemux->stream_task);
+    g_static_rec_mutex_free (&mssdemux->stream_lock);
+    mssdemux->stream_task = NULL;
+  }
 
   G_OBJECT_CLASS (parent_class)->dispose (object);
 }
@@ -325,8 +362,10 @@ gst_mss_demux_start (GstMssDemux * mssdemux)
   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
-    gst_task_start (stream->stream_task);
+    gst_task_start (stream->download_task);
   }
+
+  gst_task_start (mssdemux->stream_task);
 }
 
 static gboolean
@@ -378,17 +417,23 @@ static void
 gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
 {
   GSList *iter;
+
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
 
+    gst_data_queue_set_flushing (stream->dataqueue, TRUE);
+
     if (immediate)
       gst_uri_downloader_cancel (stream->downloader);
-    gst_task_pause (stream->stream_task);
+    gst_task_pause (stream->download_task);
   }
+  gst_task_pause (mssdemux->stream_task);
+
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
-    g_static_rec_mutex_lock (&stream->stream_lock);
+    g_static_rec_mutex_lock (&stream->download_lock);
   }
+  g_static_rec_mutex_lock (&mssdemux->stream_lock);
 }
 
 static void
@@ -397,13 +442,16 @@ gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
   GSList *iter;
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
-    g_static_rec_mutex_unlock (&stream->stream_lock);
+    g_static_rec_mutex_unlock (&stream->download_lock);
   }
+  g_static_rec_mutex_unlock (&mssdemux->stream_lock);
   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
     GstMssDemuxStream *stream = iter->data;
 
-    gst_task_start (stream->stream_task);
+    gst_data_queue_set_flushing (stream->dataqueue, FALSE);
+    gst_task_start (stream->download_task);
   }
+  gst_task_start (mssdemux->stream_task);
 }
 
 static gboolean
@@ -458,6 +506,8 @@ gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
         GstMssDemuxStream *stream = iter->data;
 
+        stream->eos = FALSE;
+        gst_data_queue_flush (stream->dataqueue);
         stream->pending_newsegment = gst_event_ref (newsegment);
       }
       gst_event_unref (newsegment);
@@ -727,7 +777,7 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
   GSList *oldpads = NULL;
   GSList *iter;
 
-  gst_mss_demux_stop_tasks (mssdemux, FALSE);
+  gst_mss_demux_stop_tasks (mssdemux, TRUE);
   if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
           mssdemux->connection_speed)) {
 
@@ -736,15 +786,46 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
       GstMssDemuxStream *stream = iter->data;
       GstPad *oldpad = stream->pad;
-      GstClockTime ts =
-          gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
+      GstClockTime ts = GST_CLOCK_TIME_NONE;
 
       oldpads = g_slist_prepend (oldpads, oldpad);
 
+      /* since we are flushing the queue, get the next un-pushed timestamp to seek
+       * and avoid gaps */
+      gst_data_queue_set_flushing (stream->dataqueue, FALSE);
+      if (!gst_data_queue_is_empty (stream->dataqueue)) {
+        GstDataQueueItem *item = NULL;
+
+        while (!gst_data_queue_is_empty (stream->dataqueue)
+            && !GST_CLOCK_TIME_IS_VALID (ts)) {
+          gst_data_queue_pop (stream->dataqueue, &item);
+
+          if (!item) {
+            g_assert_not_reached ();
+            break;
+          }
+
+          if (GST_IS_BUFFER (item->object)) {
+            GstBuffer *buffer = GST_BUFFER_CAST (item->object);
+
+            ts = GST_BUFFER_TIMESTAMP (buffer);
+          }
+          item->destroy (item);
+        }
+
+      }
+      if (!GST_CLOCK_TIME_IS_VALID (ts)) {
+        ts = gst_mss_stream_get_fragment_gst_timestamp
+            (stream->manifest_stream);
+      }
+
+      GST_DEBUG_OBJECT (mssdemux,
+          "Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream,
+          GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
+      gst_mss_stream_seek (stream->manifest_stream, ts);
+      gst_data_queue_flush (stream->dataqueue);
+
       stream->pad = _create_pad (mssdemux, stream->manifest_stream);
-      /* TODO keep the same playback rate */
-      stream->pending_newsegment =
-          gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, ts, -1, ts);
       gst_mss_demux_expose_stream (mssdemux, stream);
 
       gst_pad_push_event (oldpad, gst_event_new_eos ());
@@ -763,6 +844,37 @@ gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
   gst_mss_demux_restart_tasks (mssdemux);
 }
 
+static void
+_free_data_queue_item (gpointer obj)
+{
+  GstDataQueueItem *item = obj;
+
+  gst_mini_object_unref (item->object);
+  g_slice_free (GstDataQueueItem, item);
+}
+
+static void
+gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
+    GstMiniObject * obj)
+{
+  GstDataQueueItem *item;
+
+  item = g_slice_new (GstDataQueueItem);
+  item->object = (GstMiniObject *) obj;
+
+  item->duration = 0;           /* we don't care */
+  item->size = 0;
+  item->visible = TRUE;
+
+  item->destroy = (GDestroyNotify) _free_data_queue_item;
+
+  if (!gst_data_queue_push (stream->dataqueue, item)) {
+    GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
+    gst_mini_object_unref (obj);
+    g_slice_free (GstDataQueueItem, item);
+  }
+}
+
 static GstFlowReturn
 gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
     GstBuffer ** buffer)
@@ -811,7 +923,17 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
   GST_BUFFER_DURATION (_buffer) =
       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
 
-  *buffer = _buffer;
+  if (buffer)
+    *buffer = _buffer;
+
+  if (_buffer) {
+    GST_DEBUG_OBJECT (mssdemux,
+        "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT,
+        stream, GST_PAD_NAME (stream->pad),
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)));
+    gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
+  }
+
   return ret;
 
 no_url_error:
@@ -819,24 +941,118 @@ no_url_error:
     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
         (_("Failed to get fragment URL.")),
         ("An error happened when getting fragment URL"));
-    gst_task_stop (stream->stream_task);
+    gst_task_stop (stream->download_task);
     return GST_FLOW_ERROR;
   }
 error:
   {
     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
-    gst_task_stop (stream->stream_task);
+    gst_task_stop (stream->download_task);
     return GST_FLOW_ERROR;
   }
 }
 
 static void
-gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
+gst_mss_demux_download_loop (GstMssDemuxStream * stream)
 {
   GstMssDemux *mssdemux = stream->parent;
   GstBuffer *buffer = NULL;
   GstFlowReturn ret;
 
+  GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
+
+
+  ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+  switch (ret) {
+    case GST_FLOW_OK:
+      break;                    /* all is good, let's go */
+    case GST_FLOW_UNEXPECTED:  /* EOS */
+      goto eos;
+    case GST_FLOW_ERROR:
+      goto error;
+    default:
+      break;
+  }
+
+  g_assert (buffer != NULL);
+
+  gst_mss_stream_advance_fragment (stream->manifest_stream);
+  GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
+  return;
+
+eos:
+  {
+    GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
+        GST_DEBUG_PAD_NAME (stream->pad));
+    gst_mss_demux_stream_store_object (stream,
+        GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
+    gst_task_stop (stream->download_task);
+    return;
+  }
+error:
+  {
+    GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
+    gst_task_stop (stream->download_task);
+    return;
+  }
+}
+
+static GstFlowReturn
+gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
+    GstMssDemuxStream ** stream)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstMssDemuxStream *current = NULL;
+  GstClockTime cur_time = GST_CLOCK_TIME_NONE;
+  GSList *iter;
+
+  if (!mssdemux->streams)
+    return GST_FLOW_ERROR;
+
+  for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+    GstClockTime time;
+    GstMssDemuxStream *other;
+    GstDataQueueItem *item;
+
+    other = iter->data;
+    if (other->eos) {
+      continue;
+    }
+
+    if (gst_data_queue_peek (other->dataqueue, &item)) {
+    } else {
+      /* flushing */
+      return GST_FLOW_WRONG_STATE;
+    }
+
+    if (GST_IS_EVENT (item->object)) {
+      /* events have higher priority */
+      current = other;
+      break;
+    }
+    time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
+    if (time < cur_time) {
+      cur_time = time;
+      current = other;
+    }
+  }
+
+  *stream = current;
+  if (current == NULL)
+    ret = GST_FLOW_UNEXPECTED;
+  return ret;
+}
+
+static void
+gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
+{
+  GstMssDemuxStream *stream = NULL;
+  GstFlowReturn ret;
+  GstMiniObject *object = NULL;
+  GstDataQueueItem *item = NULL;
+
+  GST_LOG_OBJECT (mssdemux, "Starting stream loop");
+
   GST_OBJECT_LOCK (mssdemux);
   if (mssdemux->update_bitrates) {
     mssdemux->update_bitrates = FALSE;
@@ -844,37 +1060,70 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
 
     GST_DEBUG_OBJECT (mssdemux,
         "Starting streams reconfiguration due to bitrate changes");
-    g_thread_create ((GThreadFunc) gst_mss_demux_reconfigure, mssdemux, FALSE,
-        NULL);
+    gst_mss_demux_reconfigure (mssdemux);
     GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
-    gst_task_stop (stream->stream_task);
-    return;
   } else {
     GST_OBJECT_UNLOCK (mssdemux);
   }
 
-  ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+  ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
+
+  if (stream)
+    GST_DEBUG_OBJECT (mssdemux,
+        "Stream loop selected %p stream of pad %s. %d - %s", stream,
+        GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
+  else
+    GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
+        gst_flow_get_name (ret));
+
   switch (ret) {
     case GST_FLOW_OK:
-      break;                    /* all is good, let's go */
-    case GST_FLOW_UNEXPECTED:  /* EOS */
-      goto eos;
+      break;
     case GST_FLOW_ERROR:
       goto error;
+    case GST_FLOW_UNEXPECTED:
+      goto eos;
+    case GST_FLOW_WRONG_STATE:
+      GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
+      goto stop;
     default:
-      break;
+      g_assert_not_reached ();
   }
 
-  g_assert (buffer != NULL);
+  GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
+      stream, GST_PAD_NAME (stream->pad));
+  if (gst_data_queue_pop (stream->dataqueue, &item)) {
+    if (item->object)
+      object = gst_mini_object_ref (item->object);
+    item->destroy (item);
+  } else {
+    GST_DEBUG_OBJECT (mssdemux,
+        "Failed to get object from dataqueue on stream %p %s", stream,
+        GST_PAD_NAME (stream->pad));
+    goto stop;
+  }
 
   if (G_UNLIKELY (stream->pending_newsegment)) {
     gst_pad_push_event (stream->pad, stream->pending_newsegment);
     stream->pending_newsegment = NULL;
   }
 
-  GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s",
-      GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
-  ret = gst_pad_push (stream->pad, buffer);
+  if (G_LIKELY (GST_IS_BUFFER (object))) {
+    GST_DEBUG_OBJECT (mssdemux,
+        "Pushing buffer %p %" GST_TIME_FORMAT " on pad %s", object,
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
+        GST_PAD_NAME (stream->pad));
+    ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
+  } else if (GST_IS_EVENT (object)) {
+    if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
+      stream->eos = TRUE;
+    GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
+        GST_PAD_NAME (stream->pad));
+    gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
+  } else {
+    g_return_if_reached ();
+  }
+
   switch (ret) {
     case GST_FLOW_UNEXPECTED:
       goto eos;                 /* EOS ? */
@@ -887,22 +1136,25 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
       break;
   }
 
-  gst_mss_stream_advance_fragment (stream->manifest_stream);
+  GST_LOG_OBJECT (mssdemux, "Stream loop end");
   return;
 
 eos:
   {
-    GstEvent *eos = gst_event_new_eos ();
-    GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s",
-        GST_DEBUG_PAD_NAME (stream->pad));
-    gst_pad_push_event (stream->pad, eos);
-    gst_task_stop (stream->stream_task);
+    GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
+    gst_task_stop (mssdemux->stream_task);
     return;
   }
 error:
   {
     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
-    gst_task_stop (stream->stream_task);
+    gst_task_stop (mssdemux->stream_task);
+    return;
+  }
+stop:
+  {
+    GST_DEBUG_OBJECT (mssdemux, "Stopping streaming task");
+    gst_task_stop (mssdemux->stream_task);
     return;
   }
 }
index c279cc2b7e74590e6f3c671bb435788c0d712942..ceb471bdaf6468fdaec0edd49738a0aecc47b149 100644 (file)
@@ -25,6 +25,7 @@
 
 #include <gst/gst.h>
 #include <gst/base/gstadapter.h>
+#include <gst/base/gstdataqueue.h>
 #include "gstmssmanifest.h"
 #include "gsturidownloader.h"
 
@@ -58,13 +59,15 @@ struct _GstMssDemuxStream {
   GstMssStream *manifest_stream;
 
   GstUriDownloader *downloader;
+  GstDataQueue *dataqueue;
 
   GstEvent *pending_newsegment;
 
-  /* Streaming task */
-  GstTask *stream_task;
-  GStaticRecMutex stream_lock;
+  /* Downloading task */
+  GstTask *download_task;
+  GStaticRecMutex download_lock;
 
+  gboolean eos;
 };
 
 struct _GstMssDemux {
@@ -84,8 +87,13 @@ struct _GstMssDemux {
 
   gboolean update_bitrates;
 
+  /* Streaming task */
+  GstTask *stream_task;
+  GStaticRecMutex stream_lock;
+
   /* properties */
   guint64 connection_speed; /* in bps */
+  guint data_queue_max_size;
 };
 
 struct _GstMssDemuxClass {