mssdemux: implement bitrate switching
authorThiago Santos <thiago.sousa.santos@collabora.com>
Thu, 10 Jan 2013 18:16:36 +0000 (15:16 -0300)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Wed, 8 May 2013 00:05:11 +0000 (21:05 -0300)
When connection-speed changes, signal that we might need a bitrate
switch. During the switch, a new pad group is added and the old one
is drained and removed.

New pads also need to push newsegments before starting to stream

ext/smoothstreaming/gstmssdemux.c
ext/smoothstreaming/gstmssdemux.h

index cb6cde3..faf4f5b 100644 (file)
@@ -124,9 +124,9 @@ gst_mss_demux_class_init (GstMssDemuxClass * klass)
   gobject_class->get_property = gst_mss_demux_get_property;
 
   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
-      g_param_spec_uint64 ("connection-speed", "Connection Speed",
+      g_param_spec_uint ("connection-speed", "Connection Speed",
           "Network connection speed in kbps (0 = unknown)",
-          0, G_MAXUINT64 / 1000, DEFAULT_CONNECTION_SPEED,
+          0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gstelement_class->change_state =
@@ -246,7 +246,12 @@ gst_mss_demux_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_CONNECTION_SPEED:
+      GST_OBJECT_LOCK (mssdemux);
       mssdemux->connection_speed = g_value_get_uint (value) * 1000;
+      mssdemux->update_bitrates = TRUE;
+      GST_DEBUG_OBJECT (mssdemux, "Connection speed set to %llu",
+          mssdemux->connection_speed);
+      GST_OBJECT_UNLOCK (mssdemux);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -369,6 +374,38 @@ gst_mss_demux_event (GstPad * pad, GstEvent * event)
   return ret;
 }
 
+static void
+gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
+{
+  GSList *iter;
+  for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+    GstMssDemuxStream *stream = iter->data;
+
+    if (immediate)
+      gst_uri_downloader_cancel (stream->downloader);
+    gst_task_pause (stream->stream_task);
+  }
+  for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+    GstMssDemuxStream *stream = iter->data;
+    g_static_rec_mutex_lock (&stream->stream_lock);
+  }
+}
+
+static void
+gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
+{
+  GSList *iter;
+  for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+    GstMssDemuxStream *stream = iter->data;
+    g_static_rec_mutex_unlock (&stream->stream_lock);
+  }
+  for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+    GstMssDemuxStream *stream = iter->data;
+
+    gst_task_start (stream->stream_task);
+  }
+}
+
 static gboolean
 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
 {
@@ -408,17 +445,7 @@ gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
         gst_event_unref (flush);
       }
 
-      /* stop the tasks */
-      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
-        GstMssDemuxStream *stream = iter->data;
-
-        gst_uri_downloader_cancel (stream->downloader);
-        gst_task_pause (stream->stream_task);
-      }
-      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
-        GstMssDemuxStream *stream = iter->data;
-        g_static_rec_mutex_lock (&stream->stream_lock);
-      }
+      gst_mss_demux_stop_tasks (mssdemux, TRUE);
 
       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
@@ -444,16 +471,7 @@ gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
         gst_event_unref (flush);
       }
 
-      /* restart tasks */
-      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
-        GstMssDemuxStream *stream = iter->data;
-        g_static_rec_mutex_unlock (&stream->stream_lock);
-      }
-      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
-        GstMssDemuxStream *stream = iter->data;
-
-        gst_task_start (stream->stream_task);
-      }
+      gst_mss_demux_restart_tasks (mssdemux);
 
       return TRUE;
     }
@@ -535,6 +553,41 @@ _set_src_pad_functions (GstPad * pad)
   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
 }
 
+static GstPad *
+_create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
+{
+  gchar *name;
+  GstPad *srcpad = NULL;
+  GstMssStreamType streamtype;
+
+  streamtype = gst_mss_stream_get_type (manifeststream);
+  GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
+      gst_mss_stream_type_name (streamtype));
+
+  /* TODO use stream's name/bitrate/index as the pad name? */
+  if (streamtype == MSS_STREAM_TYPE_VIDEO) {
+    name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
+    srcpad =
+        gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
+        name);
+    g_free (name);
+  } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
+    name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
+    srcpad =
+        gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
+        name);
+    g_free (name);
+  }
+
+  if (!srcpad) {
+    GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
+    return NULL;
+  }
+
+  _set_src_pad_functions (srcpad);
+  return srcpad;
+}
+
 static void
 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
 {
@@ -550,46 +603,29 @@ gst_mss_demux_create_streams (GstMssDemux * mssdemux)
   }
 
   for (iter = streams; iter; iter = g_slist_next (iter)) {
-    gchar *name;
     GstPad *srcpad = NULL;
     GstMssDemuxStream *stream = NULL;
     GstMssStream *manifeststream = iter->data;
-    GstMssStreamType streamtype;
-
-    streamtype = gst_mss_stream_get_type (manifeststream);
-    GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
-        gst_mss_stream_type_name (streamtype));
-
-    /* TODO use stream's name as the pad name? */
-    if (streamtype == MSS_STREAM_TYPE_VIDEO) {
-      name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
-      srcpad =
-          gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
-          name);
-      g_free (name);
-    } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
-      name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
-      srcpad =
-          gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
-          name);
-      g_free (name);
-    }
+
+    srcpad = _create_pad (mssdemux, manifeststream);
 
     if (!srcpad) {
-      GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
       continue;
     }
 
-    _set_src_pad_functions (srcpad);
-
     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
     gst_mss_stream_set_active (manifeststream, TRUE);
     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
   }
 
   /* select initial bitrates */
+  GST_OBJECT_LOCK (mssdemux);
+  GST_INFO_OBJECT (mssdemux, "Changing max bitrate to %llu",
+      mssdemux->connection_speed);
   gst_mss_manifest_change_bitrate (mssdemux->manifest,
       mssdemux->connection_speed);
+  mssdemux->update_bitrates = FALSE;
+  GST_OBJECT_UNLOCK (mssdemux);
 }
 
 static gboolean
@@ -686,6 +722,47 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
 }
 
 static void
+gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
+{
+  GSList *oldpads = NULL;
+  GSList *iter;
+
+  gst_mss_demux_stop_tasks (mssdemux, FALSE);
+  if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
+          mssdemux->connection_speed)) {
+
+    GST_DEBUG_OBJECT (mssdemux, "Creating new pad group");
+    /* if we changed the bitrate, we need to add new pads */
+    for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+      GstMssDemuxStream *stream = iter->data;
+      GstClockTime ts =
+          gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
+
+      oldpads = g_slist_prepend (oldpads, stream->pad);
+
+      stream->pad = _create_pad (mssdemux, stream->manifest_stream);
+      /* TODO keep the same playback rate */
+      stream->pending_newsegment =
+          gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, ts, -1, ts);
+      gst_mss_demux_expose_stream (mssdemux, stream);
+    }
+
+    gst_element_no_more_pads (GST_ELEMENT (mssdemux));
+
+    for (iter = oldpads; iter; iter = g_slist_next (iter)) {
+      GstPad *oldpad = iter->data;
+
+      /* Push out EOS */
+      gst_pad_push_event (oldpad, gst_event_new_eos ());
+      gst_pad_set_active (oldpad, FALSE);
+      gst_element_remove_pad (GST_ELEMENT (mssdemux), oldpad);
+      gst_object_unref (oldpad);
+    }
+  }
+  gst_mss_demux_restart_tasks (mssdemux);
+}
+
+static void
 gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
 {
   GstMssDemux *mssdemux = stream->parent;
@@ -695,6 +772,20 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
   GstBuffer *buffer;
   GstFlowReturn ret;
 
+  GST_OBJECT_LOCK (mssdemux);
+  if (mssdemux->update_bitrates) {
+    mssdemux->update_bitrates = FALSE;
+    GST_OBJECT_UNLOCK (mssdemux);
+
+    GST_DEBUG_OBJECT (mssdemux,
+        "Starting streams reconfiguration due to bitrate changes");
+    g_thread_create ((GThreadFunc) gst_mss_demux_reconfigure, mssdemux, FALSE,
+        NULL);
+    GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
+  } else {
+    GST_OBJECT_UNLOCK (mssdemux);
+  }
+
   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
   switch (ret) {
@@ -732,6 +823,12 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
   GST_BUFFER_DURATION (buffer) =
       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
 
+  if (GST_BUFFER_TIMESTAMP (buffer) > 10 * GST_SECOND
+      && mssdemux->connection_speed != 1000) {
+    mssdemux->connection_speed = 1000;
+    mssdemux->update_bitrates = TRUE;
+  }
+
   if (G_UNLIKELY (stream->pending_newsegment)) {
     gst_pad_push_event (stream->pad, stream->pending_newsegment);
     stream->pending_newsegment = NULL;
@@ -764,14 +861,6 @@ eos:
     gst_task_stop (stream->stream_task);
     return;
   }
-no_url_error:
-  {
-    GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
-        (_("Failed to get fragment URL.")),
-        ("An error happened when getting fragment URL"));
-    gst_task_stop (stream->stream_task);
-    return;
-  }
 error:
   {
     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
index 54d87f9..c279cc2 100644 (file)
@@ -64,6 +64,7 @@ struct _GstMssDemuxStream {
   /* Streaming task */
   GstTask *stream_task;
   GStaticRecMutex stream_lock;
+
 };
 
 struct _GstMssDemux {
@@ -81,6 +82,8 @@ struct _GstMssDemux {
   guint n_videos;
   guint n_audios;
 
+  gboolean update_bitrates;
+
   /* properties */
   guint64 connection_speed; /* in bps */
 };